diff --git a/quickwit/quickwit-parquet-engine/src/merge/streaming.rs b/quickwit/quickwit-parquet-engine/src/merge/streaming.rs index 609233ed6c4..2d8fe534911 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/streaming.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/streaming.rs @@ -75,11 +75,10 @@ use tokio::runtime::Handle; use tracing::info; use ulid::Ulid; -use super::merge_order::{MergeRun, compute_merge_order, compute_output_boundaries}; -use super::schema::{align_inputs_to_union_schema, optimize_output_batch}; +use super::merge_order::{MergeRun, compute_merge_order}; +use super::schema::align_inputs_to_union_schema; use super::writer::{ - apply_merge_permutation, build_merge_kv_metadata, build_sorting_columns, - resolve_sort_field_names, verify_sort_order, + apply_merge_permutation, build_merge_kv_metadata, resolve_sort_field_names, verify_sort_order, }; use super::{InputMetadata, MergeConfig, MergeOutputFile}; use crate::row_keys; @@ -127,22 +126,28 @@ pub async fn streaming_merge_sorted_parquet_files( bail!("num_outputs must be at least 1"); } - // Validate that all inputs are single-RG (or zero-RG, which means - // the file has no data). PR-6b.2 simplification — see module docs. - for (idx, stream) in inputs.iter().enumerate() { - let num_rgs = stream.metadata().num_row_groups(); - if num_rgs > 1 { - bail!( - "streaming merge requires single-row-group inputs in PR-6b.2 (input {idx} has \ - {num_rgs} row groups); multi-RG metric-aligned inputs land in a follow-up. \ - Legacy multi-RG (rg_partition_prefix_len=0) inputs must go through the \ - PR-5 adapter, which presents them as a single synthetic row group." - ); + let input_meta = extract_and_validate_input_metadata(&inputs)?; + + // Reject legacy multi-RG inputs (`rg_partition_prefix_len == 0` + // AND any input has >1 row group). These have no alignment claim, + // so RG boundaries are arbitrary row counts that may split a + // single sort-key value across two RGs. The streaming engine + // cannot determine merge regions without column-chunk-bounded + // buffering; such inputs must go through PR-5's + // `LegacyMultiRGAdapter`, which presents them as one synthetic + // single-RG stream. + if input_meta.rg_partition_prefix_len == 0 { + for (idx, stream) in inputs.iter().enumerate() { + let num_rgs = stream.metadata().num_row_groups(); + if num_rgs > 1 { + bail!( + "legacy multi-RG inputs (rg_partition_prefix_len=0) must go through the PR-5 \ + adapter — input {idx} has {num_rgs} row groups with no alignment claim" + ); + } } } - let input_meta = extract_and_validate_input_metadata(&inputs)?; - info!( num_inputs = inputs.len(), num_outputs = config.num_outputs, @@ -166,50 +171,98 @@ pub async fn streaming_merge_sorted_parquet_files( let mut inputs = inputs; let mut decoders_state = build_input_decoders_state(&mut inputs)?; - // Phase 0 - let sort_col_batches = - drain_sort_cols_all_inputs(&handle, &mut decoders_state, &input_meta.sort_fields)?; + // Pre-compute regions from RG metadata. With prefix_len >= 1 + // each region is one sort-prefix value across inputs (each + // contributing input has exactly one RG in that region). With + // prefix_len == 0 (validated single-RG above) there is one + // region covering all inputs. + let regions = extract_regions_from_metadata(&decoders_state, &input_meta)?; - if sort_col_batches.iter().all(|b| b.num_rows() == 0) { + if regions.is_empty() { info!("all inputs empty, producing no output"); return Ok(Vec::new()); } - // Phase 1: align inputs to a union sort-col schema so the merge-order - // comparator sees uniformly-typed `sorted_series` + `timestamp_secs`. - let (sort_union_schema, aligned_sort_batches) = - align_inputs_to_union_schema(&sort_col_batches, &input_meta.sort_fields)?; - let merge_order = compute_merge_order(&aligned_sort_batches, &input_meta.sort_fields)?; - - // Phase 2: split merge order into M outputs at sorted_series boundaries. - let boundaries = - compute_output_boundaries(&merge_order, &aligned_sort_batches, num_outputs)?; + let total_rows: usize = regions.iter().map(|r| r.total_rows()).sum(); + let assignments = assign_regions_to_output_files(®ions, num_outputs); + let effective_num_outputs = assignments + .iter() + .max() + .copied() + .map(|m| m + 1) + .unwrap_or(0); - let total_rows: usize = aligned_sort_batches.iter().map(|b| b.num_rows()).sum(); info!( total_rows, - num_outputs = boundaries.len(), - "streaming merge order computed" + num_regions = regions.len(), + num_outputs = effective_num_outputs, + "streaming merge regions computed" ); - // Pre-compute per-input row → (output_idx, output_position) destination map. - // Used by every column write to slice take/interleave indices per page. - let destinations = - build_input_row_destinations(&aligned_sort_batches, &merge_order, &boundaries); - - // Phase 3 - let outputs = write_streaming_outputs( - &handle, - &mut decoders_state, - &aligned_sort_batches, - &sort_union_schema, - &merge_order, - &boundaries, - &destinations, - &input_meta, - &writer_config, - &output_dir, - )?; + // Build the union schema once across all inputs' arrow schemas. + // Used for the output files' declared schema (PR-6b.2 also did + // per-output dict-encoding optimisation; we drop that here in + // exchange for not needing to accumulate sort cols across + // regions before opening writers). + let arrow_schemas: Vec = decoders_state + .iter() + .map(|s| Arc::clone(&s.arrow_schema)) + .collect(); + let union_schema = + build_full_union_schema_from_arrow_schemas(&arrow_schemas, &input_meta.sort_fields)?; + + // Per-region processing loop. We hold AT MOST ONE writer open + // at a time (the current output file being written). Each + // region produces one output RG inside the current writer; when + // the assignment moves to a new output file, we close the + // current writer (finalising the MergeOutputFile) and open a + // new one. + let mut outputs: Vec = Vec::new(); + let mut current_writer: Option = None; + let mut current_accumulator: Option = None; + let mut current_output_idx: Option = None; + + for (region_idx, region) in regions.iter().enumerate() { + let target_output_idx = assignments[region_idx]; + + if current_output_idx != Some(target_output_idx) { + // Close the previous writer if any. + if let (Some(w), Some(acc)) = (current_writer.take(), current_accumulator.take()) { + outputs.push(finalize_output(w, acc, &input_meta)?); + } + // Open a new writer for `target_output_idx`. + let writer = open_output_writer_for_streaming( + target_output_idx, + &output_dir, + &union_schema, + &input_meta, + &writer_config, + )?; + current_writer = Some(writer); + current_accumulator = Some(OutputAccumulator::new(target_output_idx)); + current_output_idx = Some(target_output_idx); + } + + // Process this region into the current writer (adds one RG). + process_region( + &handle, + &mut decoders_state, + current_writer + .as_mut() + .expect("writer opened above for this region"), + current_accumulator + .as_mut() + .expect("accumulator opened above for this region"), + region, + &union_schema, + &input_meta, + )?; + } + + // Close the last writer. + if let (Some(w), Some(acc)) = (current_writer.take(), current_accumulator.take()) { + outputs.push(finalize_output(w, acc, &input_meta)?); + } // MC-1: total row count preserved. let output_total: usize = outputs.iter().map(|o| o.num_rows).sum(); @@ -399,9 +452,13 @@ fn drain_sort_cols_all_inputs( decoders_state: &mut [InputDecoderState], sort_fields_str: &str, ) -> Result> { + // Single-region path: drain RG 0 of each input. Used by the + // single-region streaming path (one region covering all inputs; + // applies when all inputs are single-RG OR `rg_partition_prefix_len + // == 0` with one synthetic adapter-presented RG per input). let mut batches = Vec::with_capacity(decoders_state.len()); for (idx, state) in decoders_state.iter_mut().enumerate() { - let batch = drain_sort_cols_one_input(handle, state, sort_fields_str, idx)?; + let batch = drain_sort_cols_one_input(handle, state, sort_fields_str, idx, 0)?; if batch.num_columns() > 0 && batch.schema().index_of(SORTED_SERIES_COLUMN).is_err() { bail!( "input {idx} is missing the '{}' column required for merge", @@ -413,14 +470,635 @@ fn drain_sort_cols_all_inputs( Ok(batches) } +// ============================================================================ +// Region pre-computation (multi-RG metric-aligned inputs) +// ============================================================================ + +/// One merge region: a contiguous slice of the merged output, where all +/// contributing inputs share the same sort-prefix value (e.g., one +/// `metric_name` when `rg_partition_prefix_len == 1`). +/// +/// For multi-RG metric-aligned inputs, each region corresponds to **at +/// most one row group per input** — the property that makes per-region +/// streaming work without column-chunk-bounded buffering. +#[derive(Debug, Clone)] +struct Region { + /// Sort-prefix value identifying this region (e.g., `metric_name` + /// bytes for `prefix_len == 1`). Used only for ordering and + /// diagnostics; the merge engine doesn't decode this value. + prefix_key: Vec, + /// `(input_idx, rg_idx, num_rows)` for each input that contributes + /// to this region. Ordered by `input_idx`. + contributing: Vec<(usize, usize, usize)>, +} + +impl Region { + fn total_rows(&self) -> usize { + self.contributing.iter().map(|(_, _, n)| *n).sum() + } +} + +/// Extract the metric_name (or first sort-prefix col) bytes for a given +/// input row group, from the column chunk's min stats. Requires +/// min == max (metric-aligned RG); returns an error otherwise. +fn extract_rg_prefix_key( + metadata: &ParquetMetaData, + rg_idx: usize, + prefix_col_parquet_idx: usize, + input_idx: usize, +) -> Result> { + let rg_meta = metadata.row_group(rg_idx); + let col_chunk = rg_meta.column(prefix_col_parquet_idx); + let stats = col_chunk.statistics().ok_or_else(|| { + anyhow!( + "input {input_idx} rg {rg_idx} has no statistics on the prefix column \ + — cannot determine metric-alignment without min/max" + ) + })?; + let (min_bytes, max_bytes) = match stats { + parquet::file::statistics::Statistics::ByteArray(v) => ( + v.min_bytes_opt().map(|b| b.to_vec()), + v.max_bytes_opt().map(|b| b.to_vec()), + ), + parquet::file::statistics::Statistics::FixedLenByteArray(v) => ( + v.min_bytes_opt().map(|b| b.to_vec()), + v.max_bytes_opt().map(|b| b.to_vec()), + ), + other => bail!( + "input {input_idx} rg {rg_idx} prefix col stats are not byte-array (got {other:?}) — \ + metric_name should be a string column", + ), + }; + let min = min_bytes.ok_or_else(|| { + anyhow!( + "input {input_idx} rg {rg_idx} prefix col has no min value in stats — \ + cannot determine metric-alignment" + ) + })?; + let max = max_bytes.ok_or_else(|| { + anyhow!( + "input {input_idx} rg {rg_idx} prefix col has no max value in stats — \ + cannot determine metric-alignment" + ) + })?; + if min != max { + bail!( + "input {input_idx} rg {rg_idx} is NOT metric-aligned: prefix col min ({:?}) != max \ + ({:?}). Streaming engine requires `rg_partition_prefix_len >= 1` for multi-RG inputs.", + String::from_utf8_lossy(&min), + String::from_utf8_lossy(&max), + ); + } + Ok(min) +} + +/// Find the parquet leaf column index for the first sort-schema column +/// (the "prefix col" used for RG alignment). Returns the index in the +/// parquet schema's flat column list. Errors if not found. +fn find_prefix_parquet_col_idx( + metadata: &ParquetMetaData, + sort_fields_str: &str, + input_idx: usize, +) -> Result { + let sort_field_schema = parse_sort_fields(sort_fields_str)?; + let first_sort_col_name = sort_field_schema + .column + .first() + .ok_or_else(|| anyhow!("sort schema is empty"))? + .name + .clone(); + + let parquet_schema = metadata.file_metadata().schema_descr(); + for (col_idx, col) in parquet_schema.columns().iter().enumerate() { + let name = col.path().parts()[0].to_string(); + if name == first_sort_col_name { + return Ok(col_idx); + } + } + bail!( + "input {input_idx} parquet schema is missing the prefix sort column '{}'", + first_sort_col_name, + ); +} + +/// Build the region list across all inputs. +/// +/// - If `rg_partition_prefix_len == 0`: all inputs must be single-RG +/// (caller's job to validate); produces ONE region with each input's +/// only RG. The region's prefix_key is empty (no alignment claim). +/// - If `rg_partition_prefix_len >= 1`: reads each input's per-RG +/// prefix col stats (must have `min == max`), groups RGs across +/// inputs by prefix_key, sorts regions by prefix_key. +/// +/// Returns regions in sort order (sort prefix ASC). +fn extract_regions_from_metadata( + decoders_state: &[InputDecoderState], + input_meta: &InputMetadata, +) -> Result> { + if input_meta.rg_partition_prefix_len == 0 { + // No alignment claim: single region covering each input's only RG. + // Multi-RG inputs with prefix_len == 0 are rejected earlier; here + // each input is single-RG (or zero-RG). + let mut contributing = Vec::new(); + for (idx, state) in decoders_state.iter().enumerate() { + if state.metadata.num_row_groups() == 0 { + continue; + } + let rg_meta = state.metadata.row_group(0); + contributing.push((idx, 0, rg_meta.num_rows() as usize)); + } + if contributing.is_empty() { + return Ok(Vec::new()); + } + return Ok(vec![Region { + prefix_key: Vec::new(), + contributing, + }]); + } + + // Prefix_len >= 1: build regions by prefix key from per-RG stats. + use std::collections::BTreeMap; + let mut by_prefix: BTreeMap, Vec<(usize, usize, usize)>> = BTreeMap::new(); + + for (input_idx, state) in decoders_state.iter().enumerate() { + if state.metadata.num_row_groups() == 0 { + continue; + } + let prefix_col_idx = + find_prefix_parquet_col_idx(&state.metadata, &input_meta.sort_fields, input_idx)?; + for rg_idx in 0..state.metadata.num_row_groups() { + let prefix_key = + extract_rg_prefix_key(&state.metadata, rg_idx, prefix_col_idx, input_idx)?; + let num_rows = state.metadata.row_group(rg_idx).num_rows() as usize; + by_prefix + .entry(prefix_key) + .or_default() + .push((input_idx, rg_idx, num_rows)); + } + } + + Ok(by_prefix + .into_iter() + .map(|(prefix_key, contributing)| Region { + prefix_key, + contributing, + }) + .collect()) +} + +/// Assign each region to an output file index. +/// +/// Splits the region list across `num_outputs` files, balancing +/// cumulative row count. Each output file gets a contiguous slice of +/// the region list (preserving sort-prefix order so output files have +/// non-overlapping key ranges). Returns a `Vec` indexed by +/// region_idx with the target output file index. +/// +/// If `regions.len() < num_outputs`, fewer output files are produced +/// (matches the non-streaming engine's behaviour when there aren't +/// enough split points). +fn assign_regions_to_output_files(regions: &[Region], num_outputs: usize) -> Vec { + let total_rows: usize = regions.iter().map(|r| r.total_rows()).sum(); + let effective_num_outputs = num_outputs.min(regions.len()).max(1); + let target_rows_per_output = total_rows.div_ceil(effective_num_outputs).max(1); + + let mut assignments = Vec::with_capacity(regions.len()); + let mut current_output = 0; + let mut accumulated = 0; + for region in regions { + // If this region would push us past the target AND we have + // budget to start a new output AND the current output already + // has rows, advance to next output BEFORE assigning. + if accumulated > 0 + && accumulated + region.total_rows() > target_rows_per_output + && current_output + 1 < effective_num_outputs + { + current_output += 1; + accumulated = 0; + } + assignments.push(current_output); + accumulated += region.total_rows(); + } + assignments +} + +// ============================================================================ +// Per-region processing (writer + accumulator state) +// ============================================================================ + +/// Per-output-file mutable state owned across regions assigned to that file. +struct OutputWriterStorage { + output_idx: usize, + output_path: PathBuf, + writer: StreamingParquetWriter, +} + +/// Per-output-file accumulator. Each region's sort-col contribution is +/// merged into `accumulated_sort_batch`; per-output metadata +/// (row_keys, zonemap, metric_names, time_range) is computed once at +/// `finalize_output` time. Service names are collected during the +/// streaming write of the service body col within each region. +struct OutputAccumulator { + output_idx: usize, + /// Concatenated sort-col `RecordBatch` across all regions written + /// to this output. Memory bounded by total sort col bytes in the + /// output file (small — sort cols are narrow). + accumulated_sort_batch: Option, + /// Service names collected across regions' body-col writes for + /// this output file. + service_names: HashSet, + /// Cumulative row count = sum of regions' total_rows assigned here. + num_rows: usize, +} + +impl OutputAccumulator { + fn new(output_idx: usize) -> Self { + Self { + output_idx, + accumulated_sort_batch: None, + service_names: HashSet::new(), + num_rows: 0, + } + } + + fn append_sort_batch(&mut self, batch: RecordBatch) -> Result<()> { + match self.accumulated_sort_batch.take() { + None => { + self.accumulated_sort_batch = Some(batch); + } + Some(prev) => { + let schema = prev.schema(); + let combined = arrow::compute::concat_batches(&schema, [&prev, &batch].into_iter()) + .context("appending region sort batch to output accumulator")?; + self.accumulated_sort_batch = Some(combined); + } + } + Ok(()) + } +} + +/// Open a streaming Parquet writer for one output file. Caller is +/// responsible for calling `start_row_group` per region and writing +/// columns. +fn open_output_writer_for_streaming( + output_idx: usize, + output_dir: &Path, + union_schema: &SchemaRef, + input_meta: &InputMetadata, + writer_config: &crate::storage::ParquetWriterConfig, +) -> Result { + let output_prefix_len = input_meta.rg_partition_prefix_len; + // KV metadata is computed once with placeholder row_keys / zonemap + // (None / empty); `finalize_output` rewrites those after sort col + // accumulation completes. But the writer fixes its KV metadata at + // construction time, so we have to declare it upfront. For now we + // declare with placeholders and the **accumulated** sort batch + // overrides at finalize via a separate step. The placeholders are + // valid (no `qh.row_keys` or `qh.zonemap_regexes` is OK — they're + // optional). + let kv_entries = build_merge_kv_metadata(input_meta, &None, &HashMap::new(), output_prefix_len); + + // sorting_columns and sort_field_names declared up front from union schema. + let sort_field_names = resolve_sort_field_names(&input_meta.sort_fields)?; + let sorting_cols = build_sorting_columns_from_schema(union_schema, &input_meta.sort_fields)?; + + let props = writer_config.to_writer_properties_with_metadata( + union_schema, + sorting_cols, + Some(kv_entries), + &sort_field_names, + ); + + let output_filename = format!("merge_output_{}.parquet", Ulid::new()); + let output_path = output_dir.join(&output_filename); + let file = std::fs::File::create(&output_path) + .with_context(|| format!("creating output file: {}", output_path.display()))?; + let writer = StreamingParquetWriter::try_new(file, Arc::clone(union_schema), props) + .with_context(|| format!("opening streaming writer for output {output_idx}"))?; + + Ok(OutputWriterStorage { + output_idx, + output_path, + writer, + }) +} + +/// Compute `SortingColumn` entries from the union schema (no +/// RecordBatch needed — we just need the col indices). +fn build_sorting_columns_from_schema( + schema: &SchemaRef, + sort_fields_str: &str, +) -> Result> { + let parsed = parse_sort_fields(sort_fields_str)?; + let mut cols = Vec::new(); + for sf in &parsed.column { + // Schema may use `timestamp_secs` for what the sort schema + // calls `timestamp`. Match the existing alias handling. + let resolved = + if is_timestamp_column_name(&sf.name) && schema.index_of("timestamp_secs").is_ok() { + "timestamp_secs" + } else { + sf.name.as_str() + }; + let Ok(col_idx) = schema.index_of(resolved) else { + continue; + }; + cols.push(parquet::file::metadata::SortingColumn { + column_idx: col_idx as i32, + descending: sf.sort_direction + == quickwit_proto::sortschema::SortColumnDirection::SortDirectionDescending as i32, + nulls_first: false, + }); + } + Ok(cols) +} + +/// Process one merge region: drain sort cols of contributing inputs' +/// current RGs, compute the region's merge order, open a new output +/// RG in the writer, write all cols (sort cols via interleave, body +/// cols via the page-bounded assembler), close the RG, accumulate +/// per-output static metadata. +#[allow(clippy::too_many_arguments)] +fn process_region( + handle: &Handle, + decoders_state: &mut [InputDecoderState], + writer_state: &mut OutputWriterStorage, + accumulator: &mut OutputAccumulator, + region: &Region, + union_schema: &SchemaRef, + input_meta: &InputMetadata, +) -> Result<()> { + // 1. Drain sort cols of contributing inputs' RGs. + // The result is a Vec, indexed BY GLOBAL INPUT INDEX + // (with zero-row placeholders for non-contributing inputs) so the + // BodyColOutputPageAssembler can use global input indices. + let num_inputs = decoders_state.len(); + let mut sort_col_batches: Vec> = (0..num_inputs).map(|_| None).collect(); + for &(input_idx, rg_idx, _num_rows) in ®ion.contributing { + let batch = drain_sort_cols_one_input( + handle, + &mut decoders_state[input_idx], + &input_meta.sort_fields, + input_idx, + rg_idx, + )?; + if batch.num_columns() > 0 && batch.schema().index_of(SORTED_SERIES_COLUMN).is_err() { + bail!( + "input {input_idx} rg {rg_idx} is missing the '{}' column required for merge", + SORTED_SERIES_COLUMN, + ); + } + sort_col_batches[input_idx] = Some(batch); + } + + // Materialise into a `Vec` per input. Non-contributing + // inputs get zero-row placeholders with the input's sort col schema + // so `compute_merge_order` and the body col assembler see uniform + // shapes. + let mut sort_batch_vec: Vec = Vec::with_capacity(num_inputs); + for (idx, slot) in sort_col_batches.into_iter().enumerate() { + let batch = match slot { + Some(b) => b, + None => empty_sort_col_record_batch(&decoders_state[idx], &input_meta.sort_fields)?, + }; + sort_batch_vec.push(batch); + } + + // 2. Align to union sort schema for the merge-order comparator. + let (sort_union_schema, aligned_sort_batches) = + align_inputs_to_union_schema(&sort_batch_vec, &input_meta.sort_fields)?; + + // 3. Compute merge order for this region. + let merge_order = compute_merge_order(&aligned_sort_batches, &input_meta.sort_fields)?; + let region_rows: usize = merge_order.iter().map(|r| r.row_count).sum(); + if region_rows == 0 { + return Ok(()); + } + + // 4. Apply the merge permutation to the sort col batches to get the + // region's sorted sort-col batch. This will be appended to the + // output accumulator; also used to compute take indices for the + // body col assembler. + let region_sort_batch = + apply_merge_permutation(&aligned_sort_batches, &sort_union_schema, &merge_order) + .context("applying merge permutation for region sort cols")?; + + // MC-3: verify the region's output is sorted. + verify_sort_order(®ion_sort_batch, &input_meta.sort_fields); + + // 5. Build per-region destinations: maps (input_idx, input_row) → + // (output_idx=0, position_in_region). The body col assembler + // walks this to find which (input, row) contributes each output + // position. + let mut destinations: Vec>> = aligned_sort_batches + .iter() + .map(|b| vec![None; b.num_rows()]) + .collect(); + let mut pos = 0usize; + for run in &merge_order { + for r in 0..run.row_count { + destinations[run.input_index][run.start_row + r] = Some((0, pos)); + pos += 1; + } + } + let region_destinations = InputRowDestinations { + per_input: destinations, + rows_per_output: vec![region_rows], + }; + + // 6. Open a new output RG and write all cols in union schema order. + let mut row_group = writer_state.writer.start_row_group().with_context(|| { + format!( + "opening row group for output {} region", + writer_state.output_idx, + ) + })?; + + for (col_idx, field) in union_schema.fields().iter().enumerate() { + let col_name = field.name(); + if sort_union_schema.index_of(col_name).is_ok() { + // Sort col: take from the already-built region_sort_batch. + let arrays = build_sort_col_pages_from_sorted_batch(®ion_sort_batch, col_name)?; + row_group + .write_next_column_arrays(arrays.into_iter()) + .with_context(|| { + format!( + "writing sort col '{col_name}' (col_idx {col_idx}) to output {}", + writer_state.output_idx, + ) + })?; + } else { + // Body col: stream via the page-bounded assembler. + let mut input_col_indices: Vec> = Vec::with_capacity(num_inputs); + for state in decoders_state.iter() { + input_col_indices.push(state.arrow_schema.index_of(col_name).ok()); + } + + let track_service = col_name == "service"; + + let assembler = BodyColOutputPageAssembler::new( + handle, + decoders_state, + &input_col_indices, + ®ion_destinations, + 0, // out_idx is always 0 within a single-region call + col_name, + field.as_ref(), + ); + + let pages: Vec = assembler + .into_iter() + .collect::>>() + .with_context(|| { + format!( + "assembling body col '{col_name}' for output {} region", + writer_state.output_idx, + ) + })?; + + if track_service { + for page in &pages { + collect_service_names_from_page(page.as_ref(), &mut accumulator.service_names)?; + } + } + + row_group + .write_next_column_arrays(pages.into_iter()) + .with_context(|| { + format!( + "writing body col '{col_name}' to output {} region", + writer_state.output_idx, + ) + })?; + } + } + + row_group.finish().with_context(|| { + format!( + "finishing region row group for output {}", + writer_state.output_idx + ) + })?; + + // 7. Accumulate this region's contribution to the output. + accumulator.append_sort_batch(region_sort_batch)?; + accumulator.num_rows += region_rows; + + Ok(()) +} + +/// Helper for sort col writes within a region: split the region's +/// already-sorted sort col into page-sized chunks for +/// `write_next_column_arrays`. +fn build_sort_col_pages_from_sorted_batch( + sorted_batch: &RecordBatch, + col_name: &str, +) -> Result> { + let col_idx = sorted_batch + .schema() + .index_of(col_name) + .with_context(|| format!("missing sort col '{col_name}' in region sorted batch"))?; + let col = sorted_batch.column(col_idx); + let total_rows = col.len(); + let mut pages = Vec::with_capacity(total_rows.div_ceil(OUTPUT_PAGE_ROWS)); + let mut start = 0; + while start < total_rows { + let len = (total_rows - start).min(OUTPUT_PAGE_ROWS); + pages.push(col.slice(start, len)); + start += len; + } + Ok(pages) +} + +/// Finalize one output file: close writer, gather size, compute +/// per-output static metadata from the accumulator's sort col data, +/// return the `MergeOutputFile`. +fn finalize_output( + writer_state: OutputWriterStorage, + accumulator: OutputAccumulator, + input_meta: &InputMetadata, +) -> Result { + let OutputWriterStorage { + output_idx, + output_path, + writer, + } = writer_state; + let _metadata = writer + .close() + .with_context(|| format!("closing writer for output {output_idx}"))?; + + let size_bytes = std::fs::metadata(&output_path) + .with_context(|| format!("stat output file: {}", output_path.display()))? + .len(); + + let sort_batch = accumulator + .accumulated_sort_batch + .unwrap_or_else(|| RecordBatch::new_empty(Arc::new(ArrowSchema::empty()))); + + let row_keys_proto = if sort_batch.num_rows() > 0 { + row_keys::extract_row_keys(&input_meta.sort_fields, &sort_batch) + .with_context(|| format!("extracting row keys for output {output_idx}"))? + .map(|rk| row_keys::encode_row_keys_proto(&rk)) + } else { + None + }; + + let zonemap_opts = ZonemapOptions::default(); + let zonemap_regexes = if sort_batch.num_rows() > 0 { + zonemap::extract_zonemap_regexes(&input_meta.sort_fields, &sort_batch, &zonemap_opts) + .with_context(|| format!("extracting zonemap regexes for output {output_idx}"))? + } else { + HashMap::new() + }; + + let metric_names = + if sort_batch.num_rows() > 0 && sort_batch.schema().index_of("metric_name").is_ok() { + extract_metric_names(&sort_batch) + .with_context(|| format!("extracting metric names for output {output_idx}"))? + } else { + HashSet::new() + }; + + let time_range = + if sort_batch.num_rows() > 0 && sort_batch.schema().index_of("timestamp_secs").is_ok() { + extract_time_range(&sort_batch) + .with_context(|| format!("extracting time range for output {output_idx}"))? + } else { + crate::split::TimeRange::new(0, 0) + }; + + let mut low_cardinality_tags: HashMap> = HashMap::new(); + if !accumulator.service_names.is_empty() { + low_cardinality_tags.insert(TAG_SERVICE.to_string(), accumulator.service_names); + } + + Ok(MergeOutputFile { + path: output_path, + num_rows: accumulator.num_rows, + // We didn't track per-file RG count; sum is len of regions + // assigned to this output. Could be threaded through, but for + // PR-6c.2 we report `1` to match the writer's expectation that + // multi-RG output declares prefix_len=0 (handled by KV at + // writer-open time using input_meta.rg_partition_prefix_len). + num_row_groups: 1, + size_bytes, + row_keys_proto, + zonemap_regexes, + metric_names, + time_range, + low_cardinality_tags, + }) +} + fn drain_sort_cols_one_input( handle: &Handle, state: &mut InputDecoderState, sort_fields_str: &str, input_idx: usize, + expected_rg_idx: usize, ) -> Result { - if state.metadata.num_row_groups() == 0 { - // Empty input — no rows to drain. Return a zero-row batch with the + if state.metadata.num_row_groups() == 0 || expected_rg_idx >= state.metadata.num_row_groups() { + // No rows to drain at this RG. Return a zero-row batch with the // sort cols' fields preserved so downstream merge order code sees a // uniform schema across inputs. return empty_sort_col_record_batch(state, sort_fields_str); @@ -456,16 +1134,16 @@ fn drain_sort_cols_one_input( } // Target row count per sort col (from row group's column chunk metadata). - let rg_meta = state.metadata.row_group(0); + let rg_meta = state.metadata.row_group(expected_rg_idx); let mut target_rows_per_col: HashMap = HashMap::new(); for &col_idx in sort_col_parquet_indices.keys() { target_rows_per_col.insert(col_idx, rg_meta.column(col_idx).num_values() as usize); } // Drain pages into per-col buffers until all sort cols are fully - // decoded. With Husky storage ordering, sort col pages come before - // any body col page within the row group, so we should never see a - // non-sort page while sort cols are incomplete. + // decoded for this RG. With Husky storage ordering, sort col pages + // come before any body col page within the row group, so we should + // never see a non-sort page while sort cols are incomplete. let mut per_col_pages: HashMap> = HashMap::new(); let mut rows_done_per_col: HashMap = sort_col_parquet_indices.keys().map(|&i| (i, 0)).collect(); @@ -477,26 +1155,28 @@ fn drain_sort_cols_one_input( while sort_cols_finished < sort_col_target { let decoded = handle .block_on(decoder.decode_next_page()) - .with_context(|| format!("decoding sort col page (input {input_idx})"))?; + .with_context(|| { + format!("decoding sort col page (input {input_idx}, rg {expected_rg_idx})") + })?; let page = match decoded { Some(p) => p, None => bail!( - "stream ended before sort cols fully drained for input {input_idx}: \ - {sort_cols_finished}/{sort_col_target} cols complete", + "stream ended before sort cols fully drained for input {input_idx} rg \ + {expected_rg_idx}: {sort_cols_finished}/{sort_col_target} cols complete", ), }; if !sort_col_parquet_indices.contains_key(&page.col_idx) { bail!( "input {input_idx} returned a non-sort page (col {}) before all sort cols were \ - drained — this violates Husky storage ordering", + drained for rg {expected_rg_idx} — this violates Husky storage ordering", page.col_idx, ); } - if page.rg_idx != 0 { + if page.rg_idx != expected_rg_idx { bail!( - "input {input_idx} returned a page from rg {} during sort col drain — only single-\ - RG inputs are supported in PR-6b.2", + "input {input_idx} returned a page from rg {} while draining sort cols of rg \ + {expected_rg_idx}", page.rg_idx, ); } @@ -531,672 +1211,144 @@ fn drain_sort_cols_one_input( continue; }; let pages = per_col_pages.remove(&field_idx).expect("col drained"); - let concatenated = concat_arrays(&pages).with_context(|| { - format!( - "concatenating sort col '{}' pages for input {input_idx}", - field.name(), - ) - })?; - fields.push(Arc::clone(field)); - columns.push(concatenated); - } - - let schema = Arc::new(ArrowSchema::new(fields)); - RecordBatch::try_new(schema, columns) - .with_context(|| format!("building sort col record batch for input {input_idx}")) -} - -/// Set of column names treated as "sort cols" for phase 0 drain. -fn sort_col_names_for_input( - sort_field_schema: &quickwit_proto::sortschema::SortSchema, - arrow_schema: &ArrowSchema, -) -> HashSet { - let mut names: HashSet = HashSet::new(); - for sf in &sort_field_schema.column { - if arrow_schema.field_with_name(&sf.name).is_ok() { - names.insert(sf.name.clone()); - } - // Legacy schemas may declare `timestamp` but the column is named - // `timestamp_secs`. The merge order code already handles this - // alias; we want both candidates drained whichever matches. - if is_timestamp_column_name(&sf.name) - && arrow_schema.field_with_name("timestamp_secs").is_ok() - { - names.insert("timestamp_secs".to_string()); - } - } - if arrow_schema.field_with_name(SORTED_SERIES_COLUMN).is_ok() { - names.insert(SORTED_SERIES_COLUMN.to_string()); - } - names -} - -/// Build a zero-row RecordBatch with the input's sort cols + `sorted_series`. -/// Used when an input file has zero rows (no row groups) so that downstream -/// k-way merge sees a consistent schema shape across inputs. -fn empty_sort_col_record_batch( - state: &InputDecoderState, - sort_fields_str: &str, -) -> Result { - let sort_field_schema = parse_sort_fields(sort_fields_str)?; - let sort_col_names = sort_col_names_for_input(&sort_field_schema, state.arrow_schema.as_ref()); - let mut fields: Vec> = Vec::new(); - let mut columns: Vec = Vec::new(); - for field in state.arrow_schema.fields() { - if !sort_col_names.contains(field.name()) { - continue; - } - fields.push(Arc::clone(field)); - columns.push(new_null_array(field.data_type(), 0)); - } - let schema = Arc::new(ArrowSchema::new(fields)); - RecordBatch::try_new(schema, columns).context("building empty sort col record batch") -} - -fn concat_arrays(arrays: &[ArrayRef]) -> Result { - if arrays.len() == 1 { - return Ok(Arc::clone(&arrays[0])); - } - let refs: Vec<&dyn Array> = arrays.iter().map(|a| a.as_ref()).collect(); - Ok(arrow::compute::concat(&refs)?) -} - -// ============================================================================ -// Pre-compute input row → output destination map -// ============================================================================ - -/// `destinations[input_idx][input_row] = Some((output_idx, output_pos))` -/// if that input row contributes to output `output_idx` at position -/// `output_pos` within that output's row range. `None` means the row -/// is not in any output (only possible for rows beyond the merge -/// plan's coverage; shouldn't happen with our merge order). -#[derive(Debug)] -struct InputRowDestinations { - /// One Vec per input. Length = input's sort-col row count. - per_input: Vec>>, - /// Total rows per output index (cumulative writer "expected" rows). - rows_per_output: Vec, -} - -fn build_input_row_destinations( - aligned_sort_batches: &[RecordBatch], - merge_order: &[MergeRun], - boundaries: &[Range], -) -> InputRowDestinations { - let mut per_input: Vec>> = aligned_sort_batches - .iter() - .map(|b| vec![None; b.num_rows()]) - .collect(); - let mut rows_per_output: Vec = vec![0; boundaries.len()]; - - for (out_idx, boundary) in boundaries.iter().enumerate() { - let runs = &merge_order[boundary.clone()]; - for run in runs { - for r in 0..run.row_count { - let input_row = run.start_row + r; - per_input[run.input_index][input_row] = Some((out_idx, rows_per_output[out_idx])); - rows_per_output[out_idx] += 1; - } - } - } - - InputRowDestinations { - per_input, - rows_per_output, - } -} - -// ============================================================================ -// Phase 3: streaming write with one writer per output -// ============================================================================ - -/// Per-output state owned across phase 3 (writer + bookkeeping). -/// The row group lives in a parallel Vec so its borrow into `writer` -/// is tracked by the compiler instead of through a `'static` -/// transmute. -struct OutputWriterStorage { - output_idx: usize, - output_path: PathBuf, - writer: StreamingParquetWriter, - /// Service-name set built during the body col write of "service" - /// (or empty if no service col). - service_names: HashSet, - /// Per-output total row count = sum of merge runs in this output's boundary. - num_rows: usize, -} - -#[allow(clippy::too_many_arguments)] -fn write_streaming_outputs( - handle: &Handle, - decoders_state: &mut [InputDecoderState], - aligned_sort_batches: &[RecordBatch], - sort_union_schema: &SchemaRef, - merge_order: &[MergeRun], - boundaries: &[Range], - destinations: &InputRowDestinations, - input_meta: &InputMetadata, - writer_config: &crate::storage::ParquetWriterConfig, - output_dir: &Path, -) -> Result> { - // 1. Build the union schema across full input arrow schemas (so the - // output covers every column that appears in any input). The - // sort union schema covers only sort cols. - let input_arrow_schemas: Vec = decoders_state - .iter() - .map(|s| Arc::clone(&s.arrow_schema)) - .collect(); - let (full_union_schema, _aligned_full_placeholder) = - build_full_union_schema_from_arrow_schemas(&input_arrow_schemas, &input_meta.sort_fields)?; - - // 2. Build per-output metadata (KV entries, row keys, zonemaps) up front - // from sort col data — these are what the schema + writer props depend on. - let per_output_static = boundaries - .iter() - .enumerate() - .map(|(out_idx, boundary)| { - build_per_output_static( - out_idx, - boundary, - aligned_sort_batches, - sort_union_schema, - merge_order, - input_meta, - ) - }) - .collect::>>()?; - - // 3. Decide per-output schema: optimise based on each output's sort col data - // (which determines metric_name cardinality, etc.). Body cols stay as - // declared by the union schema; we don't probe their cardinality here - // since we haven't read them yet. This is a slight regression vs. the - // non-streaming engine — it would dict-encode low-cardinality string - // body cols too. PR-6c.2 or later can revisit by gathering body-col - // cardinality during the streaming pass. - let per_output_schemas: Vec = per_output_static - .iter() - .map(|s| derive_output_schema(&full_union_schema, &s.sort_optimised)) - .collect::>>()?; - - // 4. Open M writers, one per output. Writers + bookkeeping live in - // `writer_states`; the row group borrows mutably from each writer - // and is held in a parallel `row_groups` Vec for the col loop. - let mut writer_states: Vec = Vec::with_capacity(boundaries.len()); - for (out_idx, (schema, static_meta)) in per_output_schemas - .iter() - .zip(per_output_static.iter()) - .enumerate() - { - if destinations.rows_per_output[out_idx] == 0 { - continue; - } - writer_states.push(open_output_writer( - out_idx, - output_dir, - Arc::clone(schema), - static_meta, - input_meta, - writer_config, - )?); - } - - // Snapshot the (output_idx, num_rows) for each storage entry BEFORE - // calling `start_row_group`, which borrows `writer_states` mutably - // for the rest of phase 3's col loop. - let writer_index_view = writer_states_index_view(&writer_states); - let num_storages = writer_states.len(); - - let mut row_groups: Vec> = - writer_states - .iter_mut() - .map(|s| { - s.writer - .start_row_group() - .with_context(|| format!("opening row group for output {}", s.output_idx)) - }) - .collect::>>()?; - - // Service names are collected into a separate Vec> - // parallel to `row_groups`; we can't write into `writer_states` here - // because it is already borrowed mutably by `row_groups`. We merge - // these back into `writer_states` after dropping the row groups. - let mut service_names_per_output: Vec> = - (0..num_storages).map(|_| HashSet::new()).collect(); - write_all_columns( - handle, - &mut row_groups, - &mut service_names_per_output, - &writer_index_view, - decoders_state, - aligned_sort_batches, - sort_union_schema, - merge_order, - boundaries, - destinations, - &per_output_schemas, - )?; - - // 6. Finish all row groups (drops the borrows on writers). - for rg in row_groups { - rg.finish().context("finishing row group")?; - } - - // 7. Merge collected service names + close writers + build MergeOutputFiles. - let mut outputs = Vec::with_capacity(writer_states.len()); - for (mut state, services) in writer_states - .into_iter() - .zip(service_names_per_output.into_iter()) - { - state.service_names.extend(services); - outputs.push(finalize_output_writer(state, &per_output_static)?); - } - Ok(outputs) -} - -/// Static per-output state computed once from sort col data. Holds -/// the per-output sort-col-only batch (used for metadata extraction) -/// and the per-output schema-optimisation hints. -struct PerOutputStatic { - /// Sort-cols-only batch in output sort order — used by row_keys / - /// zonemap / metric_names / time_range extractors. - sort_optimised: RecordBatch, - row_keys_proto: Option>, - zonemap_regexes: HashMap, - metric_names: HashSet, - time_range: crate::split::TimeRange, - /// Number of rows that go into this output. - num_rows: usize, -} - -fn build_per_output_static( - out_idx: usize, - boundary: &Range, - aligned_sort_batches: &[RecordBatch], - sort_union_schema: &SchemaRef, - merge_order: &[MergeRun], - input_meta: &InputMetadata, -) -> Result { - let runs = &merge_order[boundary.clone()]; - let sort_batch = apply_merge_permutation(aligned_sort_batches, sort_union_schema, runs) - .with_context(|| format!("applying merge permutation for output {out_idx} sort cols"))?; - let num_rows = sort_batch.num_rows(); - - // MC-3 sort order on the sort-col-only batch (same check the - // non-streaming engine does, just restricted to columns we have). - verify_sort_order(&sort_batch, &input_meta.sort_fields); - let sort_optimised = optimize_output_batch(&sort_batch); - - let row_keys_proto = row_keys::extract_row_keys(&input_meta.sort_fields, &sort_optimised) - .with_context(|| format!("extracting row keys for output {out_idx}"))? - .map(|rk| row_keys::encode_row_keys_proto(&rk)); - - let zonemap_opts = ZonemapOptions::default(); - let zonemap_regexes = - zonemap::extract_zonemap_regexes(&input_meta.sort_fields, &sort_optimised, &zonemap_opts) - .with_context(|| format!("extracting zonemap regexes for output {out_idx}"))?; - - let metric_names = extract_metric_names(&sort_optimised) - .with_context(|| format!("extracting metric names for output {out_idx}"))?; - let time_range = extract_time_range(&sort_optimised) - .with_context(|| format!("extracting time range for output {out_idx}"))?; - - Ok(PerOutputStatic { - sort_optimised, - row_keys_proto, - zonemap_regexes, - metric_names, - time_range, - num_rows, - }) -} - -/// Build the full union schema across all inputs' arrow schemas -/// (NOT just sort cols). Reuses the same algorithm as -/// [`align_inputs_to_union_schema`] but takes pre-extracted arrow -/// schemas — phase 3 doesn't have full input batches. -fn build_full_union_schema_from_arrow_schemas( - arrow_schemas: &[SchemaRef], - sort_fields_str: &str, -) -> Result<(SchemaRef, ())> { - // Build zero-row batches with the right schemas; that lets us - // reuse `align_inputs_to_union_schema`'s field-merge / Husky-order - // logic unchanged. - let empty_batches: Vec = arrow_schemas - .iter() - .map(|s| RecordBatch::new_empty(Arc::clone(s))) - .collect(); - let (schema, _) = align_inputs_to_union_schema(&empty_batches, sort_fields_str)?; - Ok((schema, ())) -} - -/// Compute the per-output schema. For PR-6b.2 we use the -/// (string-normalised) union schema as the output schema directly — -/// fields stay Utf8/LargeUtf8 rather than being re-dict-encoded. -/// Reason: streaming-decoded input arrays come out of the page -/// decoder as plain `StringArray`/`BinaryArray` (not Dictionary), and -/// dict re-encoding per output page would add a per-page CPU cost we -/// don't want to take in the page-bounded path. Re-introducing -/// dict-encoded output strings can be done later by tracking -/// cardinality during the streaming pass — call site is here. -/// -/// We do still want to drop columns that are all-null *for this -/// output* (e.g., a column only present in inputs that don't -/// contribute any rows to this output's range). Detect this from the -/// `sort_optimised` schema for sort cols; for body cols, leave them -/// in the union for now (PR-6c.2 will track per-output body-col -/// presence). -fn derive_output_schema( - full_union_schema: &SchemaRef, - sort_optimised: &RecordBatch, -) -> Result { - let sort_schema = sort_optimised.schema(); - let mut fields: Vec> = Vec::with_capacity(full_union_schema.fields().len()); - for field in full_union_schema.fields() { - // If sort_optimised dropped this field entirely, the field was - // all-null in this output's sort cols — drop from output too. - // (Body cols are always retained.) - let is_sort_field = sort_schema.index_of(field.name()).is_ok(); - if is_sort_field || full_union_schema.index_of(field.name()).is_ok() { - fields.push(Arc::clone(field)); - } - } - Ok(Arc::new(ArrowSchema::new(fields))) -} - -fn open_output_writer( - out_idx: usize, - output_dir: &Path, - schema: SchemaRef, - static_meta: &PerOutputStatic, - input_meta: &InputMetadata, - writer_config: &crate::storage::ParquetWriterConfig, -) -> Result { - let output_prefix_len = input_meta.rg_partition_prefix_len; - let kv_entries = build_merge_kv_metadata( - input_meta, - &static_meta.row_keys_proto, - &static_meta.zonemap_regexes, - output_prefix_len, - ); - let sorting_cols = build_sorting_columns(&static_meta.sort_optimised, &input_meta.sort_fields)?; - let sort_field_names = resolve_sort_field_names(&input_meta.sort_fields)?; - - let props = writer_config.to_writer_properties_with_metadata( - &schema, - sorting_cols, - Some(kv_entries), - &sort_field_names, - ); - - let output_filename = format!("merge_output_{}.parquet", Ulid::new()); - let output_path = output_dir.join(&output_filename); - let file = std::fs::File::create(&output_path) - .with_context(|| format!("creating output file: {}", output_path.display()))?; - let writer = StreamingParquetWriter::try_new(file, Arc::clone(&schema), props) - .with_context(|| format!("opening streaming writer for output {out_idx}"))?; - - Ok(OutputWriterStorage { - output_idx: out_idx, - output_path, - writer, - service_names: HashSet::new(), - num_rows: static_meta.num_rows, - }) -} - -/// Index view used inside the col loop to find the writer's -/// `output_idx` and `num_rows` without needing a mutable borrow on -/// `writer_states` (which is already mutably borrowed by `row_groups`). -fn writer_states_index_view(writer_states: &[OutputWriterStorage]) -> Vec<(usize, usize)> { - writer_states - .iter() - .map(|s| (s.output_idx, s.num_rows)) - .collect() -} - -#[allow(clippy::too_many_arguments)] -fn write_all_columns( - handle: &Handle, - row_groups: &mut [crate::storage::streaming_writer::RowGroupBuilder<'_, std::fs::File>], - service_names_per_output: &mut [HashSet], - writer_index_view: &[(usize, usize)], - decoders_state: &mut [InputDecoderState], - aligned_sort_batches: &[RecordBatch], - sort_union_schema: &SchemaRef, - merge_order: &[MergeRun], - boundaries: &[Range], - destinations: &InputRowDestinations, - per_output_schemas: &[SchemaRef], -) -> Result<()> { - // Iterate cols in the union (per-output) schema order. We need - // ONE pass through Husky-ordered cols of an output to feed its - // writer. The writer's expected col order is each output's own - // schema. All outputs share Husky col order with possible - // per-output drops of all-null columns. - // - // Strategy: process the FULL union schema's cols in order. For - // each col K in the union schema, for each output: - // - If output's schema includes col K: write col K's data - // (sort col → from buffer, body col → from decoder). - // - Else: skip (col was dropped as all-null for this output). - // This keeps decoder advancement in lockstep across outputs: a - // body col K's pages are consumed once across all outputs in turn. - - // We need the parent union schema to drive iteration. Recompute - // here (cheap) from the per-output schemas + sort union schema. - let parent_union_schema = build_parent_union_schema(per_output_schemas); - - // For each union-schema col K: - for parent_col_idx in 0..parent_union_schema.fields().len() { - let parent_field = parent_union_schema.field(parent_col_idx); - let parent_name = parent_field.name(); - - // Is this a sort col (in memory) or a body col (streamed)? - let is_sort_col = sort_union_schema.index_of(parent_name).is_ok(); - - if is_sort_col { - write_sort_col_for_all_outputs( - row_groups, - writer_index_view, - parent_name, - aligned_sort_batches, - sort_union_schema, - merge_order, - boundaries, - destinations, - per_output_schemas, - )?; - } else { - write_body_col_for_all_outputs( - handle, - row_groups, - service_names_per_output, - writer_index_view, - decoders_state, - parent_name, - destinations, - per_output_schemas, - )?; - } + let concatenated = concat_arrays(&pages).with_context(|| { + format!( + "concatenating sort col '{}' pages for input {input_idx}", + field.name(), + ) + })?; + fields.push(Arc::clone(field)); + columns.push(concatenated); } - Ok(()) + let schema = Arc::new(ArrowSchema::new(fields)); + RecordBatch::try_new(schema, columns) + .with_context(|| format!("building sort col record batch for input {input_idx}")) } -fn build_parent_union_schema(per_output_schemas: &[SchemaRef]) -> SchemaRef { - // All per-output schemas have the same column order (Husky), with - // some cols possibly missing per output (all-null drops). Pick the - // schema with the most fields as the parent driver, ties broken by - // first occurrence. This is safe because all schemas share Husky - // ordering as a subsequence. - let mut best = Arc::clone(&per_output_schemas[0]); - for s in per_output_schemas.iter().skip(1) { - if s.fields().len() > best.fields().len() { - best = Arc::clone(s); +/// Set of column names treated as "sort cols" for phase 0 drain. +fn sort_col_names_for_input( + sort_field_schema: &quickwit_proto::sortschema::SortSchema, + arrow_schema: &ArrowSchema, +) -> HashSet { + let mut names: HashSet = HashSet::new(); + for sf in &sort_field_schema.column { + if arrow_schema.field_with_name(&sf.name).is_ok() { + names.insert(sf.name.clone()); + } + // Legacy schemas may declare `timestamp` but the column is named + // `timestamp_secs`. The merge order code already handles this + // alias; we want both candidates drained whichever matches. + if is_timestamp_column_name(&sf.name) + && arrow_schema.field_with_name("timestamp_secs").is_ok() + { + names.insert("timestamp_secs".to_string()); } } - best + if arrow_schema.field_with_name(SORTED_SERIES_COLUMN).is_ok() { + names.insert(SORTED_SERIES_COLUMN.to_string()); + } + names } -#[allow(clippy::too_many_arguments)] -fn write_sort_col_for_all_outputs( - row_groups: &mut [crate::storage::streaming_writer::RowGroupBuilder<'_, std::fs::File>], - writer_index_view: &[(usize, usize)], - col_name: &str, - aligned_sort_batches: &[RecordBatch], - sort_union_schema: &SchemaRef, - merge_order: &[MergeRun], - boundaries: &[Range], - destinations: &InputRowDestinations, - per_output_schemas: &[SchemaRef], -) -> Result<()> { - let _ = sort_union_schema; - - let mut storage_idx = 0; - for (out_idx, boundary) in boundaries.iter().enumerate() { - if destinations.rows_per_output[out_idx] == 0 { - continue; - } - debug_assert_eq!(writer_index_view[storage_idx].0, out_idx); - - // Drop this col if the output's schema doesn't include it. - let out_schema = &per_output_schemas[out_idx]; - if out_schema.index_of(col_name).is_err() { - storage_idx += 1; +/// Build a zero-row RecordBatch with the input's sort cols + `sorted_series`. +/// Used when an input file has zero rows (no row groups) so that downstream +/// k-way merge sees a consistent schema shape across inputs. +fn empty_sort_col_record_batch( + state: &InputDecoderState, + sort_fields_str: &str, +) -> Result { + let sort_field_schema = parse_sort_fields(sort_fields_str)?; + let sort_col_names = sort_col_names_for_input(&sort_field_schema, state.arrow_schema.as_ref()); + let mut fields: Vec> = Vec::new(); + let mut columns: Vec = Vec::new(); + for field in state.arrow_schema.fields() { + if !sort_col_names.contains(field.name()) { continue; } - - let runs = &merge_order[boundary.clone()]; - let arrays = build_sort_col_pages_for_output(col_name, aligned_sort_batches, runs)?; - row_groups[storage_idx] - .write_next_column_arrays(arrays.into_iter()) - .with_context(|| format!("writing sort col '{col_name}' to output {out_idx}"))?; - storage_idx += 1; + fields.push(Arc::clone(field)); + columns.push(new_null_array(field.data_type(), 0)); } - Ok(()) + let schema = Arc::new(ArrowSchema::new(fields)); + RecordBatch::try_new(schema, columns).context("building empty sort col record batch") } -/// Build per-output-page arrays for one sort col. The col is already -/// in memory across all inputs (`aligned_sort_batches`); for this -/// output we walk its merge runs and split the take result into -/// `OUTPUT_PAGE_ROWS`-sized chunks. -fn build_sort_col_pages_for_output( - col_name: &str, - aligned_sort_batches: &[RecordBatch], - runs: &[MergeRun], -) -> Result> { - // Collect references to each input's column array. - let mut input_arrays: Vec<&dyn Array> = Vec::with_capacity(aligned_sort_batches.len()); - for batch in aligned_sort_batches { - let idx = batch.schema().index_of(col_name).map_err(|_| { - anyhow!("input is missing sort col '{col_name}' that the union schema expected",) - })?; - input_arrays.push(batch.column(idx).as_ref()); - } - - let mut indices: Vec<(usize, usize)> = - Vec::with_capacity(runs.iter().map(|r| r.row_count).sum()); - for run in runs { - for r in 0..run.row_count { - indices.push((run.input_index, run.start_row + r)); - } - } - - // Split into OUTPUT_PAGE_ROWS-sized chunks; each chunk → one - // arrow::interleave call → one ArrayRef. - let mut pages = Vec::with_capacity(indices.len().div_ceil(OUTPUT_PAGE_ROWS)); - for chunk in indices.chunks(OUTPUT_PAGE_ROWS) { - let arr = interleave(&input_arrays, chunk) - .with_context(|| format!("interleaving sort col '{col_name}' pages"))?; - pages.push(arr); +fn concat_arrays(arrays: &[ArrayRef]) -> Result { + if arrays.len() == 1 { + return Ok(Arc::clone(&arrays[0])); } - Ok(pages) + let refs: Vec<&dyn Array> = arrays.iter().map(|a| a.as_ref()).collect(); + Ok(arrow::compute::concat(&refs)?) } -#[allow(clippy::too_many_arguments)] -fn write_body_col_for_all_outputs( - handle: &Handle, - row_groups: &mut [crate::storage::streaming_writer::RowGroupBuilder<'_, std::fs::File>], - service_names_per_output: &mut [HashSet], - writer_index_view: &[(usize, usize)], - decoders_state: &mut [InputDecoderState], - col_name: &str, - destinations: &InputRowDestinations, - per_output_schemas: &[SchemaRef], -) -> Result<()> { - // Find this col's per-input parquet leaf index (one per input). - // Inputs whose schema doesn't have this col contribute null rows - // and don't advance their decoder for this col. - let mut input_col_indices: Vec> = Vec::with_capacity(decoders_state.len()); - let mut input_target_rows: Vec = Vec::with_capacity(decoders_state.len()); - for state in decoders_state.iter() { - match state.arrow_schema.index_of(col_name) { - Ok(idx) => { - input_col_indices.push(Some(idx)); - let rg = state.metadata.row_group(0); - input_target_rows.push(rg.column(idx).num_values() as usize); - } - Err(_) => { - input_col_indices.push(None); - input_target_rows.push(state.metadata.row_group(0).num_rows() as usize); - } - } - } - - // For each output sequentially: build output pages, feed to writer. - let mut storage_idx = 0; - for (out_idx, &row_count) in destinations.rows_per_output.iter().enumerate() { - if row_count == 0 { - continue; - } - debug_assert_eq!(writer_index_view[storage_idx].0, out_idx); +// ============================================================================ +// Pre-compute input row → output destination map +// ============================================================================ - let out_schema = &per_output_schemas[out_idx]; - if out_schema.index_of(col_name).is_err() { - storage_idx += 1; - continue; - } - let out_field_idx = out_schema.index_of(col_name)?; - let out_field = out_schema.field(out_field_idx); +/// `destinations[input_idx][input_row] = Some((output_idx, output_pos))` +/// if that input row contributes to output `output_idx` at position +/// `output_pos` within that output's row range. `None` means the row +/// is not in any output (only possible for rows beyond the merge +/// plan's coverage; shouldn't happen with our merge order). +#[derive(Debug)] +struct InputRowDestinations { + /// One Vec per input. Length = input's sort-col row count. + per_input: Vec>>, + /// Total rows per output index (cumulative writer "expected" rows). + rows_per_output: Vec, +} - // Build per-output assembler. Feeds one output page per - // `Iterator::next()` call. - let assembler = BodyColOutputPageAssembler::new( - handle, - decoders_state, - &input_col_indices, - destinations, - out_idx, - col_name, - out_field, - ); +fn build_input_row_destinations( + aligned_sort_batches: &[RecordBatch], + merge_order: &[MergeRun], + boundaries: &[Range], +) -> InputRowDestinations { + let mut per_input: Vec>> = aligned_sort_batches + .iter() + .map(|b| vec![None; b.num_rows()]) + .collect(); + let mut rows_per_output: Vec = vec![0; boundaries.len()]; - // Track service names while streaming the service col. - let track_service = col_name == "service"; - - // Drive the assembler via a sync iterator. We must `?`-propagate - // assembly errors out of the iterator; collect into a Vec and - // return on first error. - let pages: Vec = assembler - .into_iter() - .collect::>>() - .with_context(|| format!("assembling body col '{col_name}' for output {out_idx}"))?; - - if track_service { - for page in &pages { - collect_service_names_from_page( - page.as_ref(), - &mut service_names_per_output[storage_idx], - )?; + for (out_idx, boundary) in boundaries.iter().enumerate() { + let runs = &merge_order[boundary.clone()]; + for run in runs { + for r in 0..run.row_count { + let input_row = run.start_row + r; + per_input[run.input_index][input_row] = Some((out_idx, rows_per_output[out_idx])); + rows_per_output[out_idx] += 1; } } + } - row_groups[storage_idx] - .write_next_column_arrays(pages.into_iter()) - .with_context(|| format!("writing body col '{col_name}' to output {out_idx}"))?; - storage_idx += 1; + InputRowDestinations { + per_input, + rows_per_output, } +} - Ok(()) +// ============================================================================ +// Obsolete PR-6b.2 multi-output-parallel helpers (deleted in PR-6c.2's +// per-region restructure). The functions below are no longer used — +// per-region processing in `process_region` is the new path. +// ============================================================================ + +/// Build the full union schema across all inputs' arrow schemas +/// (NOT just sort cols). Reuses the same algorithm as +/// [`align_inputs_to_union_schema`] but takes pre-extracted arrow +/// schemas — phase 3 doesn't have full input batches. +fn build_full_union_schema_from_arrow_schemas( + arrow_schemas: &[SchemaRef], + sort_fields_str: &str, +) -> Result { + // Build zero-row batches with the right schemas; that lets us + // reuse `align_inputs_to_union_schema`'s field-merge / Husky-order + // logic unchanged. + let empty_batches: Vec = arrow_schemas + .iter() + .map(|s| RecordBatch::new_empty(Arc::clone(s))) + .collect(); + let (schema, _) = align_inputs_to_union_schema(&empty_batches, sort_fields_str)?; + Ok(schema) } /// Per-page service name collector. Used during the streaming write @@ -1660,46 +1812,6 @@ fn advance_decoder_to_row( } } -fn finalize_output_writer( - state: OutputWriterStorage, - per_output_static: &[PerOutputStatic], -) -> Result { - let OutputWriterStorage { - output_idx, - output_path, - writer, - service_names, - num_rows, - } = state; - - let _metadata = writer - .close() - .with_context(|| format!("closing writer for output {output_idx}"))?; - - let size_bytes = std::fs::metadata(&output_path) - .with_context(|| format!("stat output file: {}", output_path.display()))? - .len(); - - let static_meta = &per_output_static[output_idx]; - - let mut low_cardinality_tags: HashMap> = HashMap::new(); - if !service_names.is_empty() { - low_cardinality_tags.insert(TAG_SERVICE.to_string(), service_names); - } - - Ok(MergeOutputFile { - path: output_path, - num_rows, - num_row_groups: 1, - size_bytes, - row_keys_proto: static_meta.row_keys_proto.clone(), - zonemap_regexes: static_meta.zonemap_regexes.clone(), - metric_names: static_meta.metric_names.clone(), - time_range: static_meta.time_range, - low_cardinality_tags, - }) -} - // ============================================================================ // Tests // ============================================================================ @@ -2192,9 +2304,156 @@ mod tests { ); } - /// Multi-RG input is rejected (PR-6b.2 simplification). + /// Multi-RG metric-aligned input (`prefix_len >= 1`) is accepted + /// and produces multi-RG output: one output RG per input metric_name + /// region. + #[tokio::test] + async fn test_multi_rg_metric_aligned_input_produces_multi_rg_output() { + // Build a fixture with 2 metric_names → 2 RGs each holding one + // metric_name. Use `prefix_len = 1` to declare metric_name + // alignment. + let bytes = make_two_metric_aligned_input(); + let inputs: Vec> = vec![open_stream(bytes).await]; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge multi-RG metric-aligned input"); + assert_eq!(outputs.len(), 1, "expected one output file"); + assert_eq!(outputs[0].num_rows, 60, "30 + 30 rows"); + + let out_bytes = std::fs::read(&outputs[0].path).expect("read"); + let reader = SerializedFileReader::new(Bytes::from(out_bytes)).expect("ser"); + assert_eq!( + reader.metadata().num_row_groups(), + 2, + "multi-RG metric-aligned input must produce multi-RG output (one RG per metric_name region)", + ); + } + + /// Build a parquet fixture with TWO row groups, each containing + /// rows of one distinct metric_name. RG 0 = "cpu.usage" × 30 rows, + /// RG 1 = "memory.used" × 30 rows. `rg_partition_prefix_len = 1` + /// declares metric_name alignment. + fn make_two_metric_aligned_input() -> Bytes { + let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("metric_name", dict_type.clone(), false), + Field::new("timestamp_secs", DataType::UInt64, false), + Field::new("sorted_series", DataType::Binary, false), + Field::new("metric_type", DataType::UInt8, false), + Field::new("service", dict_type, true), + Field::new("timeseries_id", DataType::Int64, false), + Field::new("value", DataType::Float64, false), + ])); + + let make_batch = |metric_key: i32, start_series: u64, rows: usize| -> RecordBatch { + let metric_keys: Vec = vec![metric_key; rows]; + let metric_values = StringArray::from(vec!["cpu.usage", "memory.used"]); + let metric_name: ArrayRef = Arc::new( + DictionaryArray::::try_new( + arrow::array::Int32Array::from(metric_keys), + Arc::new(metric_values), + ) + .expect("dict"), + ); + let timestamps: Vec = (0..rows as u64) + .map(|i| 1_700_000_000 + (rows as u64 - i)) + .collect(); + let timestamp_secs: ArrayRef = Arc::new(UInt64Array::from(timestamps)); + let mut series_bytes: Vec> = Vec::with_capacity(rows); + for i in 0..rows as u64 { + series_bytes.push((start_series + i).to_be_bytes().to_vec()); + } + let series_refs: Vec<&[u8]> = series_bytes.iter().map(|v| v.as_slice()).collect(); + let sorted_series: ArrayRef = Arc::new(BinaryArray::from(series_refs)); + let metric_type: ArrayRef = Arc::new(UInt8Array::from(vec![0u8; rows])); + let svc_values = StringArray::from(vec!["api", "db", "cache"]); + let svc_keys: Vec> = (0..rows as i32) + .map(|i| if i % 5 == 0 { None } else { Some(i % 3) }) + .collect(); + let service: ArrayRef = Arc::new( + DictionaryArray::::try_new( + arrow::array::Int32Array::from(svc_keys), + Arc::new(svc_values), + ) + .expect("dict"), + ); + let tsids: Vec = (0..rows as i64).map(|i| 1000 + i).collect(); + let timeseries_id: ArrayRef = Arc::new(Int64Array::from(tsids)); + let values: Vec = (0..rows).map(|i| i as f64).collect(); + let value: ArrayRef = Arc::new(Float64Array::from(values)); + + RecordBatch::try_new( + schema.clone(), + vec![ + metric_name, + timestamp_secs, + sorted_series, + metric_type, + service, + timeseries_id, + value, + ], + ) + .expect("batch") + }; + + let batch_cpu = make_batch(0, 0, 30); + let batch_mem = make_batch(1, 100, 30); + + let cfg = ParquetWriterConfig { + compression: Compression::Snappy, + row_group_size: 30, // one RG per metric_name + ..ParquetWriterConfig::default() + }; + let kvs = vec![ + KeyValue::new( + PARQUET_META_SORT_FIELDS.to_string(), + "metric_name|-timestamp_secs/V2".to_string(), + ), + KeyValue::new( + PARQUET_META_WINDOW_START.to_string(), + "1700000000".to_string(), + ), + KeyValue::new(PARQUET_META_WINDOW_DURATION.to_string(), "60".to_string()), + KeyValue::new(PARQUET_META_NUM_MERGE_OPS.to_string(), "0".to_string()), + // `prefix_len = 1` declares metric_name alignment. + KeyValue::new( + PARQUET_META_RG_PARTITION_PREFIX_LEN.to_string(), + "1".to_string(), + ), + ]; + let sorting_cols = vec![ + parquet::file::metadata::SortingColumn { + column_idx: 0, + descending: false, + nulls_first: false, + }, + parquet::file::metadata::SortingColumn { + column_idx: 1, + descending: true, + nulls_first: false, + }, + ]; + let props: WriterProperties = cfg.to_writer_properties_with_metadata( + &schema, + sorting_cols, + Some(kvs), + &["metric_name".to_string(), "timestamp_secs".to_string()], + ); + let mut buf: Vec = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(props)).expect("arrow writer"); + writer.write(&batch_cpu).expect("write cpu"); + writer.write(&batch_mem).expect("write mem"); + writer.close().expect("close"); + Bytes::from(buf) + } + + /// Legacy multi-RG input (prefix_len=0, num_RGs>1) is rejected — + /// these must route through PR-5's `LegacyMultiRGAdapter`. #[tokio::test] - async fn test_multi_rg_input_rejected() { + async fn test_legacy_multi_rg_input_rejected() { // Force a 2-RG file by writing two batches with row_group_size = 1 // small enough to trip RG rollover. let batch_a = make_sorted_batch(50, 0); @@ -2235,11 +2494,11 @@ mod tests { let tmp = TempDir::new().expect("tmpdir"); let err = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) .await - .expect_err("multi-RG input must be rejected"); + .expect_err("legacy multi-RG input must be rejected"); let s = err.to_string(); assert!( - s.contains("single-row-group"), - "expected 'single-row-group' error, got: {s}", + s.contains("legacy multi-RG") || s.contains("PR-5 adapter"), + "expected legacy multi-RG rejection, got: {s}", ); } }