Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ impl IndexingService {
let partition_key = RoutingExpr::new(partition_key_str).map_err(|error| {
IndexingError::Internal(format!("failed to parse partition_key: {error}"))
})?;
let parquet_merge_policy = crate::merge_policy::parquet_merge_policy_from_settings(
&index_config.indexing_settings,
);

// Spawn the Parquet merge pipeline (or reuse an existing one for this
// index). The planner mailbox is wired into the MetricsPipeline's
Expand Down Expand Up @@ -97,6 +100,7 @@ impl IndexingService {
use_sketch_processors,
partition_key,
max_num_partitions: index_config.doc_mapping.max_num_partitions,
parquet_merge_policy,
parquet_merge_planner_mailbox_opt: Some(merge_planner_mailbox),
};
let pipeline = MetricsPipeline::new(pipeline_params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,9 @@ mod tests {
ram_storage,
sequencer_mailbox,
4,
crate::merge_policy::parquet_merge_policy_from_settings(
&quickwit_config::IndexingSettings::default(),
),
);
let (uploader_mailbox, _uploader_handle) = universe.spawn_builder().spawn(uploader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ async fn test_metrics_pipeline_e2e() {
ram_storage,
sequencer_mailbox,
4,
crate::merge_policy::parquet_merge_policy_from_settings(
&quickwit_config::IndexingSettings::default(),
),
);
let (uploader_mailbox, _uploader_handle) = universe.spawn_builder().spawn(uploader);

Expand Down Expand Up @@ -525,6 +528,9 @@ async fn test_sketch_pipeline_e2e() {
ram_storage.clone(),
sequencer_mailbox,
4,
crate::merge_policy::parquet_merge_policy_from_settings(
&quickwit_config::IndexingSettings::default(),
),
);
let (uploader_mailbox, _uploader_handle) = universe.spawn_builder().spawn(uploader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,9 @@ mod tests {
ram_storage,
sequencer_mailbox,
4,
crate::merge_policy::parquet_merge_policy_from_settings(
&quickwit_config::IndexingSettings::default(),
),
);
universe.spawn_builder().spawn(uploader)
}
Expand All @@ -849,6 +852,9 @@ mod tests {
ram_storage,
sequencer_mailbox,
4,
crate::merge_policy::parquet_merge_policy_from_settings(
&quickwit_config::IndexingSettings::default(),
),
);
universe.spawn_builder().spawn(uploader)
}
Expand Down Expand Up @@ -1614,6 +1620,9 @@ mod tests {
ram_storage,
sequencer_mailbox,
4,
crate::merge_policy::parquet_merge_policy_from_settings(
&quickwit_config::IndexingSettings::default(),
),
);
let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use quickwit_parquet_engine::split::ParquetSplitMetadata;
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient};
use quickwit_proto::types::IndexUid;
use quickwit_storage::Storage;
use time::OffsetDateTime;
use tokio::sync::Semaphore;
use tracing::{debug, error, info, instrument};

Expand Down Expand Up @@ -289,6 +290,7 @@ impl ParquetMergePipeline {
self.params.storage.clone(),
sequencer_mailbox,
self.params.max_concurrent_split_uploads,
self.params.merge_policy.clone(),
);
let (merge_uploader_mailbox, merge_uploader_handle) = ctx
.spawn_actor()
Expand Down Expand Up @@ -405,15 +407,16 @@ impl ParquetMergePipeline {
Ok(())
}

/// Fetch published Parquet splits from the metastore for merge planning.
/// Fetch immature published Parquet splits from the metastore for merge planning.
///
/// On first spawn, uses the initial splits provided by the IndexingService
/// (avoids per-pipeline metastore queries when many pipelines start).
/// On subsequent spawns (after crash/respawn), queries the metastore
/// directly to recover splits that were in-flight during the crash.
///
/// The planner's `record_splits_if_necessary` filters out mature splits,
/// so we don't need to filter here.
/// The planner re-checks maturity before recording splits, so this DB-side
/// filter is an optimization and crash-recovery guard, not the only line of
/// defense.
async fn fetch_immature_splits(
&mut self,
ctx: &ActorContext<Self>,
Expand All @@ -426,7 +429,8 @@ impl ParquetMergePipeline {
// Dispatch to the correct RPC based on whether this is a metrics or
// sketches index — they use separate Postgres tables.
let index_uid = self.params.index_uid.clone();
let query = quickwit_metastore::ListParquetSplitsQuery::for_index(index_uid.clone());
let query = quickwit_metastore::ListParquetSplitsQuery::for_index(index_uid.clone())
.retain_immature(OffsetDateTime::now_utc());
let is_sketch = quickwit_common::is_sketches_index(&index_uid.index_id);
let records = if is_sketch {
let list_request = quickwit_proto::metastore::ListSketchSplitsRequest::try_from_query(
Expand Down Expand Up @@ -588,7 +592,7 @@ mod tests {
use quickwit_actors::{ActorExitStatus, Universe};
use quickwit_common::temp_dir::TempDirectory;
use quickwit_parquet_engine::merge::policy::{
ConstWriteAmplificationParquetMergePolicy, ParquetMergePolicyConfig,
ConstWriteAmplificationParquetMergePolicy, ParquetMergePolicyConfig, ParquetSplitMaturity,
};
use quickwit_parquet_engine::split::{ParquetSplitId, ParquetSplitMetadata, TimeRange};
use quickwit_proto::metastore::{MetastoreServiceClient, MockMetastoreService};
Expand Down Expand Up @@ -638,6 +642,9 @@ mod tests {
.sort_fields("metric_name|host|timestamp_secs/V2")
.window_start_secs(0)
.window_duration_secs(3600)
.maturity(ParquetSplitMaturity::Immature {
maturation_period: Duration::from_secs(3600),
})
.build()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use quickwit_common::temp_dir::TempDirectory;
use quickwit_common::test_utils::wait_until_predicate;
use quickwit_metastore::StageParquetSplitsRequestExt;
use quickwit_parquet_engine::merge::policy::{
ConstWriteAmplificationParquetMergePolicy, ParquetMergePolicyConfig,
ConstWriteAmplificationParquetMergePolicy, ParquetMergePolicyConfig, ParquetSplitMaturity,
};
use quickwit_parquet_engine::sorted_series::SORTED_SERIES_COLUMN;
use quickwit_parquet_engine::split::{ParquetSplitId, ParquetSplitMetadata, TimeRange};
Expand Down Expand Up @@ -165,6 +165,9 @@ fn make_test_split_metadata(
.sort_fields(table_config.effective_sort_fields())
.window_start_secs(0)
.window_duration_secs(900)
.maturity(ParquetSplitMaturity::Immature {
maturation_period: Duration::from_secs(3600),
})
.add_metric_name(metric_name)
.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,21 +201,16 @@ impl ParquetMergePlanner {
fn record_splits_if_necessary(&mut self, splits: Vec<ParquetSplitMetadata>) {
let now = std::time::SystemTime::now();
for split in splits {
match self
if let ParquetSplitMaturity::Mature = self
.merge_policy
.split_maturity(split.size_bytes, split.num_merge_ops)
{
ParquetSplitMaturity::Mature => continue,
ParquetSplitMaturity::Immature {
maturation_period, ..
} => {
// A split that has lived past its maturation period is
// effectively mature — no further merges needed. This
// mirrors the Tantivy merge planner's `is_mature(now)`.
if split.created_at + maturation_period <= now {
continue;
}
}
// This can happen if the merge policy changed (e.g. decreased
// target_split_size_bytes or max_merge_ops).
continue;
}
if split.is_mature(now) {
continue;
}
if !self.acknowledge_split(split.split_id.as_str()) {
continue;
Expand Down Expand Up @@ -355,7 +350,7 @@ mod tests {

use quickwit_actors::Universe;
use quickwit_parquet_engine::merge::policy::{
ConstWriteAmplificationParquetMergePolicy, ParquetMergePolicyConfig,
ConstWriteAmplificationParquetMergePolicy, ParquetMergePolicyConfig, ParquetSplitMaturity,
};
use quickwit_parquet_engine::split::{ParquetSplitId, ParquetSplitMetadata, TimeRange};

Expand All @@ -373,6 +368,9 @@ mod tests {
.sort_fields("metric_name|host|timestamp_secs/V2")
.window_start_secs(0)
.window_duration_secs(3600)
.maturity(ParquetSplitMaturity::Immature {
maturation_period: Duration::from_secs(3600),
})
.num_merge_ops(num_merge_ops)
.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ mod tests {
ram_storage,
sequencer_mailbox,
4,
crate::merge_policy::parquet_merge_policy_from_settings(
&quickwit_config::IndexingSettings::default(),
),
);
universe.spawn_builder().spawn(uploader)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_common::spawn_named_task;
use quickwit_metastore::StageParquetSplitsRequestExt;
use quickwit_parquet_engine::merge::policy::ParquetMergePolicy;
use quickwit_parquet_engine::split::{ParquetSplitKind, ParquetSplitMetadata};
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient};
use quickwit_storage::Storage;
Expand Down Expand Up @@ -93,6 +94,7 @@ pub struct ParquetUploader {
split_store: Arc<dyn Storage>,
sequencer_mailbox: Mailbox<Sequencer<Publisher>>,
max_concurrent_uploads: usize,
merge_policy: Arc<dyn ParquetMergePolicy>,
counters: UploaderCounters,
}

Expand All @@ -104,13 +106,15 @@ impl ParquetUploader {
split_store: Arc<dyn Storage>,
sequencer_mailbox: Mailbox<Sequencer<Publisher>>,
max_concurrent_uploads: usize,
merge_policy: Arc<dyn ParquetMergePolicy>,
) -> Self {
Self {
uploader_type,
metastore,
split_store,
sequencer_mailbox,
max_concurrent_uploads,
merge_policy,
counters: Default::default(),
}
}
Expand Down Expand Up @@ -228,13 +232,14 @@ impl Handler<ParquetSplitBatch> for ParquetUploader {
// Clone what we need for the async task
let metastore = self.metastore.clone();
let split_store = self.split_store.clone();
let merge_policy = self.merge_policy.clone();
let counters = self.counters.clone();

let output_dir = batch.output_dir;
let checkpoint_delta_opt = batch.checkpoint_delta_opt;
let publish_lock = batch.publish_lock;
let publish_token_opt = batch.publish_token_opt;
let splits = batch.splits;
let mut splits = batch.splits;
let replaced_split_ids = batch.replaced_split_ids;
let merge_task_opt = batch._merge_task_opt;
// Hold the scratch directory alive until the upload task completes.
Expand All @@ -258,6 +263,11 @@ impl Handler<ParquetSplitBatch> for ParquetUploader {
return;
}

for split in &mut splits {
split.maturity =
merge_policy.split_maturity(split.size_bytes, split.num_merge_ops);
}

// Stage splits in metastore based on split type
let stage_result =
stage_splits(metastore.clone(), index_uid.clone(), &splits).await;
Expand Down Expand Up @@ -422,7 +432,20 @@ mod tests {
let mut mock_metastore = MockMetastoreService::new();
mock_metastore
.expect_stage_metrics_splits()
.withf(|request| request.index_uid().index_id == "test-index")
.withf(|request| {
if request.index_uid().index_id != "test-index" {
return false;
}
let splits = request.deserialize_splits_metadata().unwrap();
matches!(
splits.as_slice(),
[split]
if matches!(
split.maturity,
quickwit_parquet_engine::merge::policy::ParquetSplitMaturity::Immature { .. }
)
)
})
.times(1)
.returning(|_| Ok(EmptyResponse {}));

Expand All @@ -433,6 +456,9 @@ mod tests {
ram_storage.clone(),
sequencer_mailbox,
4,
crate::merge_policy::parquet_merge_policy_from_settings(
&quickwit_config::IndexingSettings::default(),
),
);

let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader);
Expand Down Expand Up @@ -522,6 +548,9 @@ mod tests {
ram_storage.clone(),
sequencer_mailbox,
4,
crate::merge_policy::parquet_merge_policy_from_settings(
&quickwit_config::IndexingSettings::default(),
),
);

let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader);
Expand Down Expand Up @@ -609,6 +638,9 @@ mod tests {
ram_storage.clone(),
sequencer_mailbox,
4,
crate::merge_policy::parquet_merge_policy_from_settings(
&quickwit_config::IndexingSettings::default(),
),
);

let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader);
Expand Down Expand Up @@ -675,6 +707,9 @@ mod tests {
ram_storage.clone(),
sequencer_mailbox,
4,
crate::merge_policy::parquet_merge_policy_from_settings(
&quickwit_config::IndexingSettings::default(),
),
);

let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ pub struct MetricsPipelineParams {
pub partition_key: quickwit_doc_mapper::RoutingExpr,
/// Maximum number of index partitions allowed in a workbench.
pub max_num_partitions: NonZeroU32,
/// Parquet merge policy used to assign maturity to newly produced splits.
pub parquet_merge_policy: Arc<dyn quickwit_parquet_engine::merge::policy::ParquetMergePolicy>,
/// Parquet merge planner mailbox for the publisher feedback loop.
/// When set, the publisher sends ParquetNewSplits to the planner
/// after publishing ingest splits so they can be considered for merging.
Expand Down Expand Up @@ -351,6 +353,7 @@ impl MetricsPipeline {
self.params.storage.clone(),
sequencer_mailbox,
self.params.max_concurrent_split_uploads,
self.params.parquet_merge_policy.clone(),
);
let (uploader_mailbox, uploader_handle) = ctx
.spawn_actor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@ fn stage_parquet_splits(
let now = OffsetDateTime::now_utc().unix_timestamp();
for metadata in splits_metadata {
let split_id = metadata.split_id.as_str().to_string();
let maturity_timestamp = metadata.maturity_timestamp_secs();

if let Some(existing) = splits_map.get(&split_id)
&& existing.state != SplitState::Staged
Expand All @@ -931,7 +932,7 @@ fn stage_parquet_splits(
create_timestamp: now,
node_id: String::new(),
delete_opstamp: 0,
maturity_timestamp: 0,
maturity_timestamp,
};
splits_map.insert(split_id, stored);
}
Expand Down
Loading
Loading