diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 0a6807c3884..3c9745f8194 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -8592,6 +8592,7 @@ dependencies = [ "google-cloud-pubsub", "itertools 0.14.0", "libz-sys", + "metrics", "mockall", "oneshot 0.2.1", "openssl", diff --git a/quickwit/quickwit-indexing/Cargo.toml b/quickwit/quickwit-indexing/Cargo.toml index 2a0d581797d..b48dcd86851 100644 --- a/quickwit/quickwit-indexing/Cargo.toml +++ b/quickwit/quickwit-indexing/Cargo.toml @@ -29,6 +29,7 @@ google-cloud-googleapis = { workspace = true, optional = true } google-cloud-pubsub = { workspace = true, optional = true } itertools = { workspace = true } libz-sys = { workspace = true, optional = true } +metrics = { workspace = true } oneshot = { workspace = true } openssl = { workspace = true, optional = true } percent-encoding = { workspace = true } diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs index 969b87f3b26..47fc21b2362 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs @@ -23,6 +23,7 @@ //! ``` mod indexing_service_impl; +mod parquet_compaction_metrics; mod parquet_doc_processor; mod parquet_indexer; mod parquet_merge_executor; diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_compaction_metrics.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_compaction_metrics.rs new file mode 100644 index 00000000000..4dafc460ce7 --- /dev/null +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_compaction_metrics.rs @@ -0,0 +1,457 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! DogStatsD metrics for the Parquet compaction path. +//! +//! Quickwit installs the DogStatsD exporter in the CLI binary. This module uses +//! the `metrics` crate directly so these measurements are emitted to Datadog +//! instead of registering Prometheus collectors. + +use std::time::Duration; + +use quickwit_parquet_engine::merge::policy::ParquetMergeOperation; +use quickwit_parquet_engine::split::{ParquetSplitKind, ParquetSplitMetadata}; + +const STATUS_PLANNED: &str = "planned"; +const STATUS_COMPLETED: &str = "completed"; +const STATUS_FAILED: &str = "failed"; + +const STAGE_SCHEDULE: &str = "schedule"; +const STAGE_DOWNLOAD: &str = "download"; +const STAGE_MERGE: &str = "merge"; +const STAGE_STAGING: &str = "staging"; +const STAGE_UPLOAD: &str = "upload"; +const STAGE_PUBLISH: &str = "publish"; +const STAGE_SEQUENCER: &str = "sequencer"; + +const REASON_MATURE: &str = "mature"; +const REASON_TIME_MATURED: &str = "time_matured"; +const REASON_DUPLICATE: &str = "duplicate"; +const REASON_MISSING_SCOPE: &str = "missing_scope"; + +const METRIC_OPERATIONS: &str = "parquet.compaction.operations"; +const METRIC_FAILURES: &str = "parquet.compaction.failures"; +const METRIC_SKIPPED_SPLITS: &str = "parquet.compaction.planner.skipped_splits"; +const METRIC_INPUT_BYTES: &str = "parquet.compaction.input_bytes"; +const METRIC_OUTPUT_BYTES: &str = "parquet.compaction.output_bytes"; +const METRIC_INPUT_SPLITS: &str = "parquet.compaction.input_splits"; +const METRIC_OUTPUT_SPLITS: &str = "parquet.compaction.output_splits"; +const METRIC_ROWS: &str = "parquet.compaction.rows"; +const METRIC_STAGED_SPLITS: &str = "parquet.compaction.staged_splits"; +const METRIC_UPLOADED_SPLITS: &str = "parquet.compaction.uploaded_splits"; +const METRIC_UPLOADED_BYTES: &str = "parquet.compaction.uploaded_bytes"; +const METRIC_PUBLISHED_SPLITS: &str = "parquet.compaction.published_splits"; +const METRIC_REPLACED_SPLITS: &str = "parquet.compaction.replaced_splits"; +const METRIC_DURATION_SECONDS: &str = "parquet.compaction.duration_seconds"; +const METRIC_INPUT_BYTES_PER_OPERATION: &str = "parquet.compaction.input_bytes_per_operation"; +const METRIC_OUTPUT_BYTES_PER_OPERATION: &str = "parquet.compaction.output_bytes_per_operation"; +const METRIC_INPUT_SPLITS_PER_OPERATION: &str = "parquet.compaction.input_splits_per_operation"; +const METRIC_OUTPUT_SPLITS_PER_OPERATION: &str = "parquet.compaction.output_splits_per_operation"; +const METRIC_ROWS_PER_OPERATION: &str = "parquet.compaction.rows_per_operation"; +const METRIC_INPUT_OUTPUT_BYTES_RATIO: &str = "parquet.compaction.input_output_bytes_ratio"; +const METRIC_PLANNER_ELIGIBLE_SPLITS: &str = "parquet.compaction.planner.eligible_splits"; +const METRIC_PLANNER_ONGOING_OPERATIONS: &str = "parquet.compaction.planner.ongoing_operations"; + +fn kind_label(kind: ParquetSplitKind) -> &'static str { + match kind { + ParquetSplitKind::Metrics => "points", + ParquetSplitKind::Sketches => "sketches", + } +} + +pub(super) fn index_id_from_uid(index_uid: &str) -> &str { + index_uid + .split_once(':') + .map(|(index_id, _)| index_id) + .unwrap_or(index_uid) +} + +fn operation_labels(merge_operation: &ParquetMergeOperation) -> (&str, ParquetSplitKind) { + merge_operation + .splits + .first() + .map(|split| (index_id_from_uid(&split.index_uid), split.kind)) + .unwrap_or(("unknown", ParquetSplitKind::Metrics)) +} + +fn split_labels(split: &ParquetSplitMetadata) -> (&str, ParquetSplitKind) { + (index_id_from_uid(&split.index_uid), split.kind) +} + +pub(super) fn kind_from_index_id(index_id: &str) -> ParquetSplitKind { + if quickwit_common::is_sketches_index(index_id) { + ParquetSplitKind::Sketches + } else { + ParquetSplitKind::Metrics + } +} + +pub(super) struct ParquetCompactionMetrics; + +impl ParquetCompactionMetrics { + pub(super) fn record_mature_split(&self, split: &ParquetSplitMetadata) { + self.record_skipped_split(split, REASON_MATURE); + } + + pub(super) fn record_time_matured_split(&self, split: &ParquetSplitMetadata) { + self.record_skipped_split(split, REASON_TIME_MATURED); + } + + pub(super) fn record_duplicate_split(&self, split: &ParquetSplitMetadata) { + self.record_skipped_split(split, REASON_DUPLICATE); + } + + pub(super) fn record_missing_scope_split(&self, split: &ParquetSplitMetadata) { + self.record_skipped_split(split, REASON_MISSING_SCOPE); + } + + pub(super) fn record_planned_operation(&self, merge_operation: &ParquetMergeOperation) { + let (index_id, kind) = operation_labels(merge_operation); + increment_status_counter(METRIC_OPERATIONS, index_id, kind, STATUS_PLANNED, 1); + increment_gauge(METRIC_PLANNER_ONGOING_OPERATIONS, index_id, kind, 1.0); + increment_labeled_counter( + METRIC_INPUT_BYTES, + index_id, + kind, + merge_operation.total_size_bytes(), + ); + increment_labeled_counter( + METRIC_INPUT_SPLITS, + index_id, + kind, + merge_operation.splits.len() as u64, + ); + record_labeled_histogram( + METRIC_INPUT_BYTES_PER_OPERATION, + index_id, + kind, + merge_operation.total_size_bytes() as f64, + ); + record_labeled_histogram( + METRIC_INPUT_SPLITS_PER_OPERATION, + index_id, + kind, + merge_operation.splits.len() as f64, + ); + } + + pub(super) fn record_download_success( + &self, + merge_operation: &ParquetMergeOperation, + duration: Duration, + ) { + self.record_stage_duration(merge_operation, STAGE_DOWNLOAD, duration); + } + + pub(super) fn record_schedule_failure( + &self, + merge_operation: &ParquetMergeOperation, + duration: Duration, + ) { + self.record_operation_failure(merge_operation, STAGE_SCHEDULE, duration); + } + + pub(super) fn record_download_failure( + &self, + merge_operation: &ParquetMergeOperation, + duration: Duration, + ) { + self.record_operation_failure(merge_operation, STAGE_DOWNLOAD, duration); + } + + pub(super) fn record_merge_success( + &self, + merge_operation: &ParquetMergeOperation, + output_splits: u64, + output_bytes: u64, + duration: Duration, + ) { + let (index_id, kind) = operation_labels(merge_operation); + self.record_stage_duration(merge_operation, STAGE_MERGE, duration); + increment_labeled_counter(METRIC_OUTPUT_BYTES, index_id, kind, output_bytes); + increment_labeled_counter(METRIC_OUTPUT_SPLITS, index_id, kind, output_splits); + increment_labeled_counter( + METRIC_ROWS, + index_id, + kind, + merge_operation.total_num_rows(), + ); + record_labeled_histogram( + METRIC_OUTPUT_BYTES_PER_OPERATION, + index_id, + kind, + output_bytes as f64, + ); + record_labeled_histogram( + METRIC_OUTPUT_SPLITS_PER_OPERATION, + index_id, + kind, + output_splits as f64, + ); + record_labeled_histogram( + METRIC_ROWS_PER_OPERATION, + index_id, + kind, + merge_operation.total_num_rows() as f64, + ); + if output_bytes > 0 { + record_labeled_histogram( + METRIC_INPUT_OUTPUT_BYTES_RATIO, + index_id, + kind, + merge_operation.total_size_bytes() as f64 / output_bytes as f64, + ); + } + } + + pub(super) fn record_merge_failure( + &self, + merge_operation: &ParquetMergeOperation, + duration: Duration, + ) { + self.record_operation_failure(merge_operation, STAGE_MERGE, duration); + } + + pub(super) fn record_stage_failure( + &self, + index_id: &str, + kind: ParquetSplitKind, + duration: Duration, + ) { + self.record_operation_failure_for_labels(index_id, kind, STAGE_STAGING, duration); + } + + pub(super) fn record_stage_success( + &self, + index_id: &str, + kind: ParquetSplitKind, + num_splits: usize, + duration: Duration, + ) { + increment_labeled_counter(METRIC_STAGED_SPLITS, index_id, kind, num_splits as u64); + record_stage_duration_for_labels(index_id, kind, STAGE_STAGING, duration); + } + + pub(super) fn record_upload_success( + &self, + index_id: &str, + kind: ParquetSplitKind, + num_splits: usize, + num_bytes: u64, + duration: Duration, + ) { + increment_labeled_counter(METRIC_UPLOADED_SPLITS, index_id, kind, num_splits as u64); + increment_labeled_counter(METRIC_UPLOADED_BYTES, index_id, kind, num_bytes); + record_stage_duration_for_labels(index_id, kind, STAGE_UPLOAD, duration); + } + + pub(super) fn record_upload_failure( + &self, + index_id: &str, + kind: ParquetSplitKind, + duration: Duration, + ) { + self.record_operation_failure_for_labels(index_id, kind, STAGE_UPLOAD, duration); + } + + pub(super) fn record_sequencer_failure( + &self, + index_id: &str, + kind: ParquetSplitKind, + duration: Duration, + ) { + self.record_operation_failure_for_labels(index_id, kind, STAGE_SEQUENCER, duration); + } + + pub(super) fn record_publish_success( + &self, + index_id: &str, + kind: ParquetSplitKind, + published_splits: usize, + replaced_splits: usize, + duration: Duration, + ) { + increment_status_counter(METRIC_OPERATIONS, index_id, kind, STATUS_COMPLETED, 1); + increment_labeled_counter( + METRIC_PUBLISHED_SPLITS, + index_id, + kind, + published_splits as u64, + ); + increment_labeled_counter( + METRIC_REPLACED_SPLITS, + index_id, + kind, + replaced_splits as u64, + ); + decrement_gauge(METRIC_PLANNER_ONGOING_OPERATIONS, index_id, kind, 1.0); + record_stage_duration_for_labels(index_id, kind, STAGE_PUBLISH, duration); + } + + pub(super) fn record_publish_failure( + &self, + index_id: &str, + kind: ParquetSplitKind, + duration: Duration, + ) { + self.record_operation_failure_for_labels(index_id, kind, STAGE_PUBLISH, duration); + } + + pub(super) fn set_planner_eligible_splits( + &self, + index_id: &str, + kind: ParquetSplitKind, + count: usize, + ) { + set_labeled_gauge(METRIC_PLANNER_ELIGIBLE_SPLITS, index_id, kind, count as f64); + } + + fn record_skipped_split(&self, split: &ParquetSplitMetadata, reason: &'static str) { + let (index_id, kind) = split_labels(split); + metrics::counter!( + METRIC_SKIPPED_SPLITS, + "index" => index_id.to_string(), + "kind" => kind_label(kind), + "reason" => reason, + ) + .increment(1); + } + + fn record_stage_duration( + &self, + merge_operation: &ParquetMergeOperation, + stage: &'static str, + duration: Duration, + ) { + let (index_id, kind) = operation_labels(merge_operation); + record_stage_duration_for_labels(index_id, kind, stage, duration); + } + + fn record_operation_failure( + &self, + merge_operation: &ParquetMergeOperation, + stage: &'static str, + duration: Duration, + ) { + let (index_id, kind) = operation_labels(merge_operation); + self.record_operation_failure_for_labels(index_id, kind, stage, duration); + } + + fn record_operation_failure_for_labels( + &self, + index_id: &str, + kind: ParquetSplitKind, + stage: &'static str, + duration: Duration, + ) { + increment_status_counter(METRIC_OPERATIONS, index_id, kind, STATUS_FAILED, 1); + decrement_gauge(METRIC_PLANNER_ONGOING_OPERATIONS, index_id, kind, 1.0); + metrics::counter!( + METRIC_FAILURES, + "index" => index_id.to_string(), + "kind" => kind_label(kind), + "stage" => stage, + ) + .increment(1); + record_stage_duration_for_labels(index_id, kind, stage, duration); + } +} + +fn increment_labeled_counter( + name: &'static str, + index_id: &str, + kind: ParquetSplitKind, + value: u64, +) { + metrics::counter!( + name, + "index" => index_id.to_string(), + "kind" => kind_label(kind), + ) + .increment(value); +} + +fn increment_status_counter( + name: &'static str, + index_id: &str, + kind: ParquetSplitKind, + status: &'static str, + value: u64, +) { + metrics::counter!( + name, + "index" => index_id.to_string(), + "kind" => kind_label(kind), + "status" => status, + ) + .increment(value); +} + +fn record_labeled_histogram( + name: &'static str, + index_id: &str, + kind: ParquetSplitKind, + value: f64, +) { + metrics::histogram!( + name, + "index" => index_id.to_string(), + "kind" => kind_label(kind), + ) + .record(value); +} + +fn record_stage_duration_for_labels( + index_id: &str, + kind: ParquetSplitKind, + stage: &'static str, + duration: Duration, +) { + metrics::histogram!( + METRIC_DURATION_SECONDS, + "index" => index_id.to_string(), + "kind" => kind_label(kind), + "stage" => stage, + ) + .record(duration.as_secs_f64()); +} + +fn set_labeled_gauge(name: &'static str, index_id: &str, kind: ParquetSplitKind, value: f64) { + metrics::gauge!( + name, + "index" => index_id.to_string(), + "kind" => kind_label(kind), + ) + .set(value); +} + +fn increment_gauge(name: &'static str, index_id: &str, kind: ParquetSplitKind, value: f64) { + metrics::gauge!( + name, + "index" => index_id.to_string(), + "kind" => kind_label(kind), + ) + .increment(value); +} + +fn decrement_gauge(name: &'static str, index_id: &str, kind: ParquetSplitKind, value: f64) { + metrics::gauge!( + name, + "index" => index_id.to_string(), + "kind" => kind_label(kind), + ) + .decrement(value); +} + +pub(super) static PARQUET_COMPACTION_METRICS: ParquetCompactionMetrics = ParquetCompactionMetrics; diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs index bf1f5bddb4d..767079a0e2f 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs @@ -18,6 +18,8 @@ //! `run_cpu_intensive()`, builds output split metadata using //! `merge_parquet_split_metadata()`, and sends the result to the uploader. +use std::time::Instant; + use anyhow::Context; use async_trait::async_trait; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; @@ -29,6 +31,7 @@ use quickwit_proto::types::IndexUid; use tracing::{info, instrument, warn}; use super::ParquetUploader; +use super::parquet_compaction_metrics::PARQUET_COMPACTION_METRICS; use super::parquet_indexer::ParquetSplitBatch; use super::parquet_merge_messages::ParquetMergeScratch; use crate::models::PublishLock; @@ -99,9 +102,14 @@ impl Handler for ParquetMergeExecutor { // Separate output subdirectory so the merge engine's temp files // don't collide with the downloaded inputs in scratch_directory. let output_dir = scratch.scratch_directory.path().join("merged_output"); - std::fs::create_dir_all(&output_dir) - .context("failed to create merge output directory") - .map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?; + let merge_started_at = Instant::now(); + if let Err(error) = + std::fs::create_dir_all(&output_dir).context("failed to create merge output directory") + { + PARQUET_COMPACTION_METRICS + .record_merge_failure(&scratch.merge_operation, merge_started_at.elapsed()); + return Err(error.into()); + } // Run the CPU-intensive merge on the dedicated thread pool. let input_paths = scratch.downloaded_parquet_files.clone(); @@ -119,6 +127,8 @@ impl Handler for ParquetMergeExecutor { let outputs: Vec = match merge_result { Ok(Ok(outputs)) => outputs, Ok(Err(merge_err)) => { + PARQUET_COMPACTION_METRICS + .record_merge_failure(&scratch.merge_operation, merge_started_at.elapsed()); warn!( error = %merge_err, merge_split_id = %merge_split_id, @@ -132,6 +142,8 @@ impl Handler for ParquetMergeExecutor { return Ok(()); } Err(panicked) => { + PARQUET_COMPACTION_METRICS + .record_merge_failure(&scratch.merge_operation, merge_started_at.elapsed()); warn!( error = %panicked, merge_split_id = %merge_split_id, @@ -141,13 +153,19 @@ impl Handler for ParquetMergeExecutor { return Ok(()); } }; - let input_splits = &scratch.merge_operation.splits; - let index_uid: IndexUid = input_splits[0] + let index_uid: IndexUid = match input_splits[0] .index_uid .parse() .context("invalid index_uid in merge input") - .map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?; + { + Ok(index_uid) => index_uid, + Err(error) => { + PARQUET_COMPACTION_METRICS + .record_merge_failure(&scratch.merge_operation, merge_started_at.elapsed()); + return Err(error.into()); + } + }; let replaced_split_ids: Vec = input_splits .iter() @@ -164,6 +182,10 @@ impl Handler for ParquetMergeExecutor { num_replaced = replaced_split_ids.len(), "merge produced no output — publishing replacement to clean up empty inputs" ); + // Hold a refcount on the merge operation so we can record metrics + // after the original is moved into the batch. `TrackedObject` is + // Arc-backed; cloning is a cheap refcount bump. + let merge_op_for_metrics = scratch.merge_operation.clone(); let batch = ParquetSplitBatch { index_uid, splits: Vec::new(), @@ -175,15 +197,32 @@ impl Handler for ParquetMergeExecutor { _scratch_directory_opt: Some(scratch.scratch_directory), _merge_permit_opt: Some(scratch.merge_permit), }; - ctx.send_message(&self.uploader_mailbox, batch).await?; + if let Err(error) = ctx.send_message(&self.uploader_mailbox, batch).await { + PARQUET_COMPACTION_METRICS + .record_merge_failure(&merge_op_for_metrics, merge_started_at.elapsed()); + return Err(error.into()); + } + PARQUET_COMPACTION_METRICS.record_merge_success( + &merge_op_for_metrics, + 0, + 0, + merge_started_at.elapsed(), + ); return Ok(()); } let mut merged_splits = Vec::with_capacity(outputs.len()); for output in &outputs { - let mut metadata = merge_parquet_split_metadata(input_splits, output) + let mut metadata = match merge_parquet_split_metadata(input_splits, output) .context("failed to build merge output metadata") - .map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?; + { + Ok(metadata) => metadata, + Err(error) => { + PARQUET_COMPACTION_METRICS + .record_merge_failure(&scratch.merge_operation, merge_started_at.elapsed()); + return Err(error.into()); + } + }; // Use the split ID that was assigned when the merge operation was // planned, rather than the one generated inside @@ -196,15 +235,19 @@ impl Handler for ParquetMergeExecutor { // The uploader expects files at `output_dir/{split_id}.parquet`. let expected_path = output_dir.join(&metadata.parquet_file); if output.path != expected_path { - std::fs::rename(&output.path, &expected_path) - .with_context(|| { + if let Err(error) = + std::fs::rename(&output.path, &expected_path).with_context(|| { format!( "failed to rename merge output {} to {}", output.path.display(), expected_path.display() ) }) - .map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?; + { + PARQUET_COMPACTION_METRICS + .record_merge_failure(&scratch.merge_operation, merge_started_at.elapsed()); + return Err(error.into()); + } } info!( @@ -216,11 +259,18 @@ impl Handler for ParquetMergeExecutor { merged_splits.push(metadata); } + let output_splits = merged_splits.len() as u64; + let output_bytes: u64 = merged_splits.iter().map(|split| split.size_bytes).sum(); // Send to uploader. Merges have no checkpoint delta, no publish lock, // and no publish token — they're just reorganizing existing data. // The scratch directory is passed along to keep it alive until the // uploader finishes reading the merged files. + // + // Hold a refcount on the merge operation so we can record metrics + // after the original is moved into the batch. `TrackedObject` is + // Arc-backed; cloning is a cheap refcount bump. + let merge_op_for_metrics = scratch.merge_operation.clone(); let batch = ParquetSplitBatch { index_uid, splits: merged_splits, @@ -233,7 +283,17 @@ impl Handler for ParquetMergeExecutor { _merge_permit_opt: Some(scratch.merge_permit), }; - ctx.send_message(&self.uploader_mailbox, batch).await?; + if let Err(error) = ctx.send_message(&self.uploader_mailbox, batch).await { + PARQUET_COMPACTION_METRICS + .record_merge_failure(&merge_op_for_metrics, merge_started_at.elapsed()); + return Err(error.into()); + } + PARQUET_COMPACTION_METRICS.record_merge_success( + &merge_op_for_metrics, + output_splits, + output_bytes, + merge_started_at.elapsed(), + ); // The merge permit is now carried by the batch — it will be held // through the uploader and released when the publisher drops the diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_planner.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_planner.rs index a829330e831..066b188e0c6 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_planner.rs @@ -34,10 +34,11 @@ use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, Qu use quickwit_parquet_engine::merge::policy::{ CompactionScope, ParquetMergeOperation, ParquetMergePolicy, ParquetSplitMaturity, }; -use quickwit_parquet_engine::split::ParquetSplitMetadata; +use quickwit_parquet_engine::split::{ParquetSplitKind, ParquetSplitMetadata}; use tantivy::Inventory; use tracing::{info, warn}; +use super::parquet_compaction_metrics::{PARQUET_COMPACTION_METRICS, index_id_from_uid}; use super::{ParquetMergeSplitDownloader, ParquetNewSplits}; use crate::actors::MergeSchedulerService; use crate::actors::merge_scheduler_service::schedule_parquet_merge; @@ -98,6 +99,10 @@ pub struct ParquetMergePlanner { /// merge planning before the new planner has a consolidated view of /// published splits. See Tantivy MergePlanner #3847. incarnation_started_at: Instant, + + /// Label keys whose eligible split gauge has been set before. We keep this + /// so keys that become empty are explicitly reset to zero. + last_eligible_metric_keys: HashSet<(String, ParquetSplitKind)>, } #[async_trait] @@ -188,6 +193,7 @@ impl ParquetMergePlanner { merge_scheduler_service, ongoing_merge_operations_inventory: Inventory::default(), incarnation_started_at: Instant::now(), + last_eligible_metric_keys: HashSet::new(), }; planner.record_splits_if_necessary(immature_splits); planner @@ -205,7 +211,10 @@ impl ParquetMergePlanner { .merge_policy .split_maturity(split.size_bytes, split.num_merge_ops) { - ParquetSplitMaturity::Mature => continue, + ParquetSplitMaturity::Mature => { + PARQUET_COMPACTION_METRICS.record_mature_split(&split); + continue; + } ParquetSplitMaturity::Immature { maturation_period, .. } => { @@ -213,15 +222,18 @@ impl ParquetMergePlanner { // effectively mature — no further merges needed. This // mirrors the Tantivy merge planner's `is_mature(now)`. if split.created_at + maturation_period <= now { + PARQUET_COMPACTION_METRICS.record_time_matured_split(&split); continue; } } } if !self.acknowledge_split(split.split_id.as_str()) { + PARQUET_COMPACTION_METRICS.record_duplicate_split(&split); continue; } self.record_split(split); } + self.record_planner_state(); } /// Returns `true` if this split ID was not previously known, and records @@ -238,6 +250,7 @@ impl ParquetMergePlanner { fn record_split(&mut self, split: ParquetSplitMetadata) { let Some(scope) = CompactionScope::from_split(&split) else { // Pre-Phase-31 splits have no window — can't compact them. + PARQUET_COMPACTION_METRICS.record_missing_scope_split(&split); return; }; self.scoped_young_splits @@ -246,6 +259,30 @@ impl ParquetMergePlanner { .push(split); } + fn record_planner_state(&mut self) { + let mut eligible_splits: HashMap<(String, ParquetSplitKind), usize> = HashMap::new(); + for splits in self.scoped_young_splits.values() { + for split in splits { + let index_id = index_id_from_uid(&split.index_uid).to_string(); + *eligible_splits.entry((index_id, split.kind)).or_default() += 1; + } + } + + let mut current_keys: HashSet<(String, ParquetSplitKind)> = + eligible_splits.keys().cloned().collect(); + for (index_id, kind) in &self.last_eligible_metric_keys { + current_keys.insert((index_id.clone(), *kind)); + } + for (index_id, kind) in ¤t_keys { + let count = eligible_splits + .get(&(index_id.clone(), *kind)) + .copied() + .unwrap_or_default(); + PARQUET_COMPACTION_METRICS.set_planner_eligible_splits(index_id, *kind, count); + } + self.last_eligible_metric_keys = eligible_splits.keys().cloned().collect(); + } + /// Amortized GC for `known_split_ids`. /// /// The set grows monotonically (we add IDs but never remove inline). @@ -328,6 +365,8 @@ impl ParquetMergePlanner { ) -> Result<(), ActorExitStatus> { let merge_ops = self.compute_merge_ops(is_finalize, ctx).await?; for merge_operation in merge_ops { + PARQUET_COMPACTION_METRICS.record_planned_operation(&merge_operation); + let merge_operation_metrics = merge_operation.clone(); info!( merge_split_id = %merge_operation.merge_split_id, num_inputs = merge_operation.splits.len(), @@ -337,13 +376,22 @@ impl ParquetMergePlanner { let tracked = self .ongoing_merge_operations_inventory .track(merge_operation); - schedule_parquet_merge( + let schedule_started_at = Instant::now(); + if let Err(error) = schedule_parquet_merge( &self.merge_scheduler_service, tracked, self.merge_split_downloader_mailbox.clone(), ) - .await?; + .await + { + PARQUET_COMPACTION_METRICS.record_schedule_failure( + &merge_operation_metrics, + schedule_started_at.elapsed(), + ); + return Err(error.into()); + } } + self.record_planner_state(); Ok(()) } } diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_split_downloader.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_split_downloader.rs index ea7b78541c1..97267885970 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_split_downloader.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_split_downloader.rs @@ -20,6 +20,7 @@ use std::path::Path; use std::sync::Arc; +use std::time::Instant; use anyhow::Context; use async_trait::async_trait; @@ -28,6 +29,7 @@ use quickwit_common::temp_dir::TempDirectory; use quickwit_storage::Storage; use tracing::{debug, info, warn}; +use super::parquet_compaction_metrics::PARQUET_COMPACTION_METRICS; use super::parquet_merge_executor::ParquetMergeExecutor; use super::parquet_merge_messages::{ParquetMergeScratch, ParquetMergeTask}; @@ -86,6 +88,7 @@ impl Handler for ParquetMergeSplitDownloader { ) -> Result<(), ActorExitStatus> { let merge_split_id = task.merge_operation.merge_split_id.to_string(); let num_inputs = task.merge_operation.splits.len(); + let download_started_at = Instant::now(); info!( merge_split_id = %merge_split_id, @@ -130,6 +133,10 @@ impl Handler for ParquetMergeSplitDownloader { .copy_to_file(Path::new(&parquet_filename), &local_path) .await .map_err(|e| { + PARQUET_COMPACTION_METRICS.record_download_failure( + &task.merge_operation, + download_started_at.elapsed(), + ); warn!( error = %e, split_id = %split.split_id, @@ -147,6 +154,8 @@ impl Handler for ParquetMergeSplitDownloader { num_files = downloaded_paths.len(), "all parquet files downloaded for merge" ); + PARQUET_COMPACTION_METRICS + .record_download_success(&task.merge_operation, download_started_at.elapsed()); let scratch = ParquetMergeScratch { merge_operation: task.merge_operation, diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_uploader.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_uploader.rs index 3702f727f93..f73ba696d6a 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_uploader.rs @@ -21,6 +21,7 @@ use std::path::Path; use std::sync::atomic::Ordering; use std::sync::{Arc, OnceLock}; +use std::time::{Duration, Instant}; use anyhow::Context; use async_trait::async_trait; @@ -33,6 +34,7 @@ use quickwit_storage::Storage; use tokio::sync::{Semaphore, SemaphorePermit, oneshot}; use tracing::{Instrument, Span, debug, info, instrument, warn}; +use super::parquet_compaction_metrics::{PARQUET_COMPACTION_METRICS, kind_from_index_id}; use super::{ParquetSplitBatch, ParquetSplitsUpdate}; use crate::actors::sequencer::{Sequencer, SequencerCommand}; use crate::actors::{Publisher, UploaderCounters, UploaderType}; @@ -163,6 +165,14 @@ impl Handler for ParquetUploader { ) -> Result<(), ActorExitStatus> { let index_uid = batch.index_uid.clone(); let num_splits = batch.splits.len(); + let is_compaction_upload = + !batch.replaced_split_ids.is_empty() || batch._merge_permit_opt.is_some(); + let compaction_kind = batch + .splits + .first() + .map(|split| split.kind) + .unwrap_or_else(|| kind_from_index_id(&index_uid.index_id)); + let compaction_index_id = index_uid.index_id.clone(); tracing::Span::current().record("index_uid", index_uid.to_string()); tracing::Span::current().record("num_splits", num_splits); @@ -173,6 +183,13 @@ impl Handler for ParquetUploader { // that must reach the publisher for graceful decommission. let (tx, rx) = oneshot::channel::>(); if let Err(e) = ctx.send_message(&self.sequencer_mailbox, rx).await { + if is_compaction_upload { + PARQUET_COMPACTION_METRICS.record_sequencer_failure( + &compaction_index_id, + compaction_kind, + Duration::ZERO, + ); + } warn!(error = %e, "failed to reserve sequencer position for empty batch"); return Ok(()); } @@ -188,6 +205,13 @@ impl Handler for ParquetUploader { _merge_permit_opt: batch._merge_permit_opt, }; if tx.send(SequencerCommand::Proceed(update)).is_err() { + if is_compaction_upload { + PARQUET_COMPACTION_METRICS.record_sequencer_failure( + &compaction_index_id, + compaction_kind, + Duration::ZERO, + ); + } warn!("sequencer receiver dropped for empty batch"); } return Ok(()); @@ -198,6 +222,13 @@ impl Handler for ParquetUploader { // be published in the order they were submitted. let (tx, rx) = oneshot::channel::>(); if let Err(e) = ctx.send_message(&self.sequencer_mailbox, rx).await { + if is_compaction_upload { + PARQUET_COMPACTION_METRICS.record_sequencer_failure( + &compaction_index_id, + compaction_kind, + Duration::ZERO, + ); + } warn!(error = %e, "failed to reserve sequencer position"); return Ok(()); } @@ -207,6 +238,13 @@ impl Handler for ParquetUploader { let kill_switch = ctx.kill_switch().clone(); if kill_switch.is_dead() { + if is_compaction_upload { + PARQUET_COMPACTION_METRICS.record_upload_failure( + &compaction_index_id, + compaction_kind, + Duration::ZERO, + ); + } warn!("kill switch was activated, cancelling metrics upload"); let _ = tx.send(SequencerCommand::Discard); return Err(ActorExitStatus::Killed); @@ -246,10 +284,18 @@ impl Handler for ParquetUploader { } // Stage splits in metastore based on split type + let staging_started_at = Instant::now(); let stage_result = stage_splits(metastore.clone(), index_uid.clone(), &splits).await; if let Err(e) = stage_result { + if is_compaction_upload { + PARQUET_COMPACTION_METRICS.record_stage_failure( + &compaction_index_id, + compaction_kind, + staging_started_at.elapsed(), + ); + } warn!(error = %e, "failed to stage splits"); // Discard sequencer position on error let _ = tx.send(SequencerCommand::Discard); @@ -260,6 +306,14 @@ impl Handler for ParquetUploader { counters .num_staged_splits .fetch_add(splits.len() as u64, Ordering::SeqCst); + if is_compaction_upload { + PARQUET_COMPACTION_METRICS.record_stage_success( + &compaction_index_id, + compaction_kind, + splits.len(), + staging_started_at.elapsed(), + ); + } info!( index_uid = %index_uid, num_splits = splits.len(), @@ -267,6 +321,8 @@ impl Handler for ParquetUploader { ); // Upload Parquet files to storage + let upload_started_at = Instant::now(); + let mut uploaded_bytes = 0u64; for split in &splits { let parquet_file = split.parquet_filename(); // Read the local Parquet file from output_dir @@ -281,6 +337,13 @@ impl Handler for ParquetUploader { parquet_file = %parquet_file, "failed to read local parquet file" ); + if is_compaction_upload { + PARQUET_COMPACTION_METRICS.record_upload_failure( + &compaction_index_id, + compaction_kind, + upload_started_at.elapsed(), + ); + } // Discard sequencer position on error let _ = tx.send(SequencerCommand::Discard); kill_switch.kill(); @@ -299,6 +362,13 @@ impl Handler for ParquetUploader { parquet_file = %parquet_file, "failed to upload parquet file" ); + if is_compaction_upload { + PARQUET_COMPACTION_METRICS.record_upload_failure( + &compaction_index_id, + compaction_kind, + upload_started_at.elapsed(), + ); + } // Discard sequencer position on error let _ = tx.send(SequencerCommand::Discard); kill_switch.kill(); @@ -306,6 +376,7 @@ impl Handler for ParquetUploader { } counters.num_uploaded_splits.fetch_add(1, Ordering::SeqCst); + uploaded_bytes += file_size as u64; // Delete the local parquet file after successful upload. if let Err(e) = tokio::fs::remove_file(&local_path).await { @@ -323,6 +394,15 @@ impl Handler for ParquetUploader { "uploaded parquet file to storage" ); } + if is_compaction_upload { + PARQUET_COMPACTION_METRICS.record_upload_success( + &compaction_index_id, + compaction_kind, + splits.len(), + uploaded_bytes, + upload_started_at.elapsed(), + ); + } // Create ParquetSplitsUpdate and send downstream. // The merge permit (if present) transfers to the update so it @@ -339,6 +419,13 @@ impl Handler for ParquetUploader { }; if tx.send(SequencerCommand::Proceed(update)).is_err() { + if is_compaction_upload { + PARQUET_COMPACTION_METRICS.record_sequencer_failure( + &compaction_index_id, + compaction_kind, + Duration::ZERO, + ); + } warn!("sequencer receiver dropped"); } diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/publisher_impl.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/publisher_impl.rs index dc56d129578..6770f07ed46 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/publisher_impl.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/publisher_impl.rs @@ -15,6 +15,8 @@ //! `Handler` implementation for `Publisher`, //! specific to the metrics pipeline. +use std::time::Instant; + use anyhow::Context; use async_trait::async_trait; use quickwit_actors::{ActorContext, ActorExitStatus, Handler}; @@ -24,6 +26,7 @@ use quickwit_proto::metastore::{ use tracing::{info, instrument}; use super::ParquetSplitsUpdate; +use super::parquet_compaction_metrics::{PARQUET_COMPACTION_METRICS, kind_from_index_id}; use crate::actors::publisher::{Publisher, serialize_checkpoint_delta, suggest_truncate}; pub(crate) const METRICS_PUBLISHER_NAME: &str = "ParquetPublisher"; @@ -53,31 +56,66 @@ impl Handler for Publisher { .iter() .map(|split| split.split_id.as_str().to_string()) .collect(); + let is_compaction_publish = !replaced_split_ids.is_empty(); + let compaction_kind = new_splits + .first() + .map(|split| split.kind) + .unwrap_or_else(|| kind_from_index_id(&index_uid.index_id)); + let publish_started_at = Instant::now(); if let Some(_guard) = publish_lock.acquire().await { - if quickwit_common::is_sketches_index(&index_uid.index_id) { - let publish_request = PublishSketchSplitsRequest { - index_uid: Some(index_uid.clone()), - staged_split_ids: split_ids.clone(), - replaced_split_ids: replaced_split_ids.clone(), - index_checkpoint_delta_json_opt, - publish_token_opt: publish_token_opt.clone(), - }; - ctx.protect_future(self.metastore.publish_sketch_splits(publish_request)) - .await - .context("failed to publish sketch splits")?; - } else { - let publish_request = PublishMetricsSplitsRequest { - index_uid: Some(index_uid.clone()), - staged_split_ids: split_ids.clone(), - replaced_split_ids: replaced_split_ids.clone(), - index_checkpoint_delta_json_opt, - publish_token_opt: publish_token_opt.clone(), + let publish_result: anyhow::Result<()> = + if quickwit_common::is_sketches_index(&index_uid.index_id) { + let publish_request = PublishSketchSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: split_ids.clone(), + replaced_split_ids: replaced_split_ids.clone(), + index_checkpoint_delta_json_opt, + publish_token_opt: publish_token_opt.clone(), + }; + ctx.protect_future(self.metastore.publish_sketch_splits(publish_request)) + .await + .context("failed to publish sketch splits") + .map(|_| ()) + } else { + let publish_request = PublishMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: split_ids.clone(), + replaced_split_ids: replaced_split_ids.clone(), + index_checkpoint_delta_json_opt, + publish_token_opt: publish_token_opt.clone(), + }; + ctx.protect_future(self.metastore.publish_metrics_splits(publish_request)) + .await + .context("failed to publish metrics splits") + .map(|_| ()) }; - ctx.protect_future(self.metastore.publish_metrics_splits(publish_request)) - .await - .context("failed to publish metrics splits")?; + if let Err(error) = publish_result { + if is_compaction_publish { + PARQUET_COMPACTION_METRICS.record_publish_failure( + &index_uid.index_id, + compaction_kind, + publish_started_at.elapsed(), + ); + } + return Err(error.into()); + } + if is_compaction_publish { + PARQUET_COMPACTION_METRICS.record_publish_success( + &index_uid.index_id, + compaction_kind, + split_ids.len(), + replaced_split_ids.len(), + publish_started_at.elapsed(), + ); } } else { + if is_compaction_publish { + PARQUET_COMPACTION_METRICS.record_publish_failure( + &index_uid.index_id, + compaction_kind, + publish_started_at.elapsed(), + ); + } info!( split_ids=?split_ids, "Splits' publish lock is dead."