diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs b/quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs index 4314f726611..23b1411213e 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs @@ -37,6 +37,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion_datasource::PartitionedFile; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource_parquet::CachedParquetFileReaderFactory; use datafusion_datasource_parquet::source::ParquetSource; use datafusion_physical_plan::expressions::Column; use mini_moka::sync::Cache; @@ -230,7 +231,7 @@ impl TableProvider for MetricsTableProvider { async fn scan( &self, - _state: &dyn Session, + state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], limit: Option, @@ -273,11 +274,20 @@ impl TableProvider for MetricsTableProvider { // Configure ParquetSource with bloom filters + pushdown enabled let table_schema: datafusion_datasource::TableSchema = self.schema.clone().into(); + let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); + let object_store = state + .runtime_env() + .object_store(self.object_store_url.clone())?; + let reader_factory = Arc::new(CachedParquetFileReaderFactory::new( + object_store, + metadata_cache, + )); let parquet_source = ParquetSource::new(table_schema) .with_bloom_filter_on_read(true) .with_pushdown_filters(true) .with_reorder_filters(true) - .with_enable_page_index(true); + .with_enable_page_index(true) + .with_parquet_file_reader_factory(reader_factory); // Build the FileScanConfig let mut builder = @@ -654,6 +664,7 @@ mod tests { }; use super::*; + use crate::sources::metrics::test_utils::TestSplitProvider; fn schema_with_columns(columns: &[&str]) -> SchemaRef { let fields = columns @@ -757,6 +768,31 @@ mod tests { assert_metadata_pruned_split_ids(splits, filters, &[]); } + #[tokio::test] + async fn scan_installs_cached_parquet_reader_factory() { + let schema = schema_with_columns(&["metric_name", "timestamp_secs", "value"]); + let split_provider = Arc::new(TestSplitProvider::new(Vec::new())); + let provider = + MetricsTableProvider::new(schema, split_provider, Uri::for_test("file:///metrics")) + .unwrap(); + let ctx = SessionContext::new(); + let state = ctx.state(); + + let plan = provider.scan(&state, None, &[], None).await.unwrap(); + let data_source_exec = plan + .as_any() + .downcast_ref::() + .expect("metrics scan should produce DataSourceExec"); + let (_file_scan, parquet_source) = data_source_exec + .downcast_to_file_source::() + .expect("metrics scan should use ParquetSource"); + + assert!( + parquet_source.parquet_file_reader_factory().is_some(), + "metrics scans should use DataFusion's metadata-caching parquet reader" + ); + } + #[test] fn metrics_output_ordering_stops_at_first_missing_sort_key() { let schema = schema_with_columns(&["metric_name", "service", "timestamp_secs"]); diff --git a/quickwit/quickwit-df-core/src/session.rs b/quickwit/quickwit-df-core/src/session.rs index ecc63a3fcb7..245fdfa870c 100644 --- a/quickwit/quickwit-df-core/src/session.rs +++ b/quickwit/quickwit-df-core/src/session.rs @@ -56,6 +56,10 @@ use crate::task_estimator::DataSourceExecPartitionEstimator; type CatalogProviderFactory = Arc Arc + Send + Sync>; type SchemaProviderFactory = Arc Arc + Send + Sync>; +/// Default per-node DataFusion cache budget for file-embedded metadata such +/// as Parquet footers and page indexes. +pub const DEFAULT_FILE_METADATA_CACHE_LIMIT_BYTES: usize = 4usize * 1024 * 1024 * 1024; + #[derive(Clone)] pub(crate) struct CatalogRegistration { name: String, @@ -80,6 +84,7 @@ pub struct DataFusionSessionBuilder { task_estimator: Arc, memory_pool: Option>, object_store_registry: Option>, + file_metadata_cache_limit_bytes: usize, runtime: Arc, } @@ -91,6 +96,10 @@ impl std::fmt::Debug for DataFusionSessionBuilder { .field("num_catalogs", &self.catalog_registrations.len()) .field("num_schemas", &self.schema_registrations.len()) .field("distributed", &self.worker_resolver.is_some()) + .field( + "file_metadata_cache_limit_bytes", + &self.file_metadata_cache_limit_bytes, + ) .finish() } } @@ -103,6 +112,10 @@ impl Default for DataFusionSessionBuilder { impl DataFusionSessionBuilder { pub fn new() -> Self { + let runtime = RuntimeEnvBuilder::new() + .with_metadata_cache_limit(DEFAULT_FILE_METADATA_CACHE_LIMIT_BYTES) + .build_arc() + .expect("default DataFusion runtime should build"); Self { runtime_plugins: Vec::new(), substrait_extensions: Vec::new(), @@ -112,12 +125,14 @@ impl DataFusionSessionBuilder { task_estimator: Arc::new(DataSourceExecPartitionEstimator), memory_pool: None, object_store_registry: None, - runtime: Arc::new(RuntimeEnv::default()), + file_metadata_cache_limit_bytes: DEFAULT_FILE_METADATA_CACHE_LIMIT_BYTES, + runtime, } } fn rebuild_runtime(&mut self) -> DFResult<()> { - let mut builder = RuntimeEnvBuilder::new(); + let mut builder = RuntimeEnvBuilder::new() + .with_metadata_cache_limit(self.file_metadata_cache_limit_bytes); if let Some(memory_pool) = &self.memory_pool { builder = builder.with_memory_pool(Arc::clone(memory_pool)); } @@ -176,6 +191,12 @@ impl DataFusionSessionBuilder { Ok(self) } + pub fn with_file_metadata_cache_limit(mut self, bytes: usize) -> DFResult { + self.file_metadata_cache_limit_bytes = bytes; + self.rebuild_runtime()?; + Ok(self) + } + pub fn with_object_store_registry( mut self, registry: Arc, @@ -411,6 +432,21 @@ mod tests { } } + #[test] + fn file_metadata_cache_limit_defaults_to_four_gib_and_can_be_overridden() { + let builder = DataFusionSessionBuilder::new(); + assert_eq!( + builder.runtime().cache_manager.get_metadata_cache_limit(), + DEFAULT_FILE_METADATA_CACHE_LIMIT_BYTES + ); + + let builder = builder.with_file_metadata_cache_limit(8 * 1024).unwrap(); + assert_eq!( + builder.runtime().cache_manager.get_metadata_cache_limit(), + 8 * 1024 + ); + } + #[test] fn runtime_settings_compose_and_reinitialize_sources() { let source_url = Url::parse("test://source").unwrap(); @@ -442,6 +478,10 @@ mod tests { .and_then(|entry| entry.value), Some("2K".to_string()) ); + assert_eq!( + builder.runtime().cache_manager.get_metadata_cache_limit(), + DEFAULT_FILE_METADATA_CACHE_LIMIT_BYTES + ); assert!( builder .runtime()