diff --git a/quickwit/quickwit-datafusion/src/object_store_registry.rs b/quickwit/quickwit-datafusion/src/object_store_registry.rs index 869af0e26a9..0c669b1b5f1 100644 --- a/quickwit/quickwit-datafusion/src/object_store_registry.rs +++ b/quickwit/quickwit-datafusion/src/object_store_registry.rs @@ -51,7 +51,7 @@ use datafusion::common::{DataFusionError, Result as DFResult}; use datafusion::execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}; use object_store::ObjectStore; use quickwit_common::uri::Uri; -use quickwit_storage::StorageResolver; +use quickwit_storage::{StorageCache, StorageResolver}; use url::Url; use crate::storage_bridge::QuickwitObjectStore; @@ -69,6 +69,7 @@ pub struct QuickwitObjectStoreRegistry { /// fallback. default: DefaultObjectStoreRegistry, storage_resolver: StorageResolver, + storage_cache: Option>, /// Lazy wrappers constructed by `get_store` on demand, keyed by /// `scheme://authority`. Plain `RwLock` is fine — contention /// is negligible because the write lock is only taken once per unique @@ -92,10 +93,16 @@ impl QuickwitObjectStoreRegistry { Self { default: DefaultObjectStoreRegistry::new(), storage_resolver, + storage_cache: None, lazy_stores: RwLock::new(HashMap::new()), } } + pub fn with_storage_cache(mut self, storage_cache: Arc) -> Self { + self.storage_cache = Some(storage_cache); + self + } + /// Canonical cache key mirroring DataFusion's `DefaultObjectStoreRegistry`: /// `scheme://authority`. Preserves the authority so indexes in /// different buckets stay distinct; paths within an authority share @@ -138,8 +145,11 @@ impl ObjectStoreRegistry for QuickwitObjectStoreRegistry { "failed to build Quickwit URI from `{key}`: {err}" )))) })?; - let store: Arc = - Arc::new(QuickwitObjectStore::new(uri, self.storage_resolver.clone())); + let mut quickwit_store = QuickwitObjectStore::new(uri, self.storage_resolver.clone()); + if let Some(storage_cache) = &self.storage_cache { + quickwit_store = quickwit_store.with_storage_cache(Arc::clone(storage_cache)); + } + let store: Arc = Arc::new(quickwit_store); let mut write = self .lazy_stores .write() diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs b/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs index 0e50e203fb3..afd2d50dbd0 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs @@ -24,6 +24,7 @@ pub(crate) mod factory; pub(crate) mod index_resolver; pub(crate) mod metastore_provider; pub(crate) mod optimizer; +pub(crate) mod parquet_cache_metrics; pub(crate) mod predicate; pub(crate) mod sketch_udf; pub(crate) mod table_provider; @@ -50,6 +51,7 @@ use quickwit_proto::metastore::{MetastoreError, MetastoreServiceClient}; use self::factory::{METRICS_FILE_TYPE, MetricsTableProviderFactory, SKETCHES_FILE_TYPE}; use self::index_resolver::{MetastoreIndexResolver, MetricsIndexResolver}; use self::optimizer::SortedSeriesStreamingAggregateRule; +pub use self::parquet_cache_metrics::instrument_parquet_range_cache_metrics; use self::sketch_udf::{create_dd_quantile_udf, create_dd_sketch_udaf}; use self::table_provider::MetricsTableProvider; diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/parquet_cache_metrics.rs b/quickwit/quickwit-datafusion/src/sources/metrics/parquet_cache_metrics.rs new file mode 100644 index 00000000000..2b9ccd8a82b --- /dev/null +++ b/quickwit/quickwit-datafusion/src/sources/metrics/parquet_cache_metrics.rs @@ -0,0 +1,227 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Per-query cache metrics for metrics parquet scans. + +use std::ops::Range; +use std::sync::Arc; + +use bytes::Bytes; +use datafusion::common::tree_node::{Transformed, TreeNode}; +use datafusion::datasource::source::DataSourceExec; +use datafusion::error::Result as DFResult; +use datafusion::parquet; +use datafusion::parquet::arrow::arrow_reader::ArrowReaderOptions; +use datafusion::parquet::arrow::async_reader::AsyncFileReader; +use datafusion::parquet::file::metadata::ParquetMetaData; +use datafusion::physical_plan::ExecutionPlan; +use datafusion_datasource::PartitionedFile; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource_parquet::ParquetFileReaderFactory; +use datafusion_datasource_parquet::source::ParquetSource; +use datafusion_physical_plan::metrics::{ + Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, +}; +use futures::FutureExt; +use futures::future::BoxFuture; +use quickwit_storage::{ + StorageCacheMetrics, StorageCacheMetricsSnapshot, with_storage_cache_metrics, +}; +use tracing::warn; + +pub(super) fn instrument_parquet_file_reader_factory( + inner: Arc, +) -> Arc { + Arc::new(InstrumentedParquetFileReaderFactory { inner }) +} + +pub fn instrument_parquet_range_cache_metrics( + plan: Arc, +) -> Arc { + match Arc::clone(&plan).transform_up(|plan| { + if let Some(rewritten) = instrument_parquet_scan(&plan) { + Ok(Transformed::yes(rewritten)) + } else { + Ok(Transformed::no(plan)) + } + }) { + Ok(transformed) => transformed.data, + Err(error) => { + warn!(%error, "failed to install parquet cache metrics on worker plan"); + plan + } + } +} + +fn instrument_parquet_scan(plan: &Arc) -> Option> { + let data_source_exec = plan.as_any().downcast_ref::()?; + let (file_scan_config, parquet_source) = + data_source_exec.downcast_to_file_source::()?; + let reader_factory = parquet_source.parquet_file_reader_factory()?.clone(); + + let parquet_source = parquet_source + .clone() + .with_parquet_file_reader_factory(instrument_parquet_file_reader_factory(reader_factory)); + let file_scan_config = FileScanConfigBuilder::from(file_scan_config.clone()) + .with_source(Arc::new(parquet_source)) + .build(); + let rewritten: Arc = DataSourceExec::from_data_source(file_scan_config); + Some(rewritten) +} + +#[derive(Debug)] +struct InstrumentedParquetFileReaderFactory { + inner: Arc, +} + +impl ParquetFileReaderFactory for InstrumentedParquetFileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + partitioned_file: PartitionedFile, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> DFResult> { + let cache_metrics = Arc::new(StorageCacheMetrics::default()); + let counters = StorageCacheMetricCounters::new(partition_index, metrics); + let reader = self.inner.create_reader( + partition_index, + partitioned_file, + metadata_size_hint, + metrics, + )?; + Ok(Box::new(StorageCacheObservedReader { + inner: reader, + cache_metrics, + counters, + })) + } +} + +#[derive(Clone)] +struct StorageCacheMetricCounters { + range_hit_bytes: Count, + range_miss_bytes: Count, + range_hit_count: Count, + range_miss_count: Count, + footer_hit_count: Count, + footer_miss_count: Count, +} + +impl StorageCacheMetricCounters { + fn new(partition_index: usize, metrics: &ExecutionPlanMetricsSet) -> Self { + let builder = || MetricBuilder::new(metrics).with_type(MetricType::SUMMARY); + Self { + range_hit_bytes: builder().counter("parquet_range_cache_hit_bytes", partition_index), + range_miss_bytes: builder().counter("parquet_range_cache_miss_bytes", partition_index), + range_hit_count: builder().counter("parquet_range_cache_hit_count", partition_index), + range_miss_count: builder().counter("parquet_range_cache_miss_count", partition_index), + footer_hit_count: builder().counter("parquet_footer_cache_hit_count", partition_index), + footer_miss_count: builder() + .counter("parquet_footer_cache_miss_count", partition_index), + } + } + + fn record_range_delta(&self, delta: StorageCacheMetricsSnapshot) { + self.range_hit_bytes.add(delta.hit_bytes); + self.range_miss_bytes.add(delta.miss_bytes); + self.range_hit_count.add(delta.hit_count); + self.range_miss_count.add(delta.miss_count); + } + + fn record_footer_observation(&self, delta: StorageCacheMetricsSnapshot) { + if delta.hit_count == 0 && delta.miss_count == 0 { + self.footer_hit_count.add(1); + } else { + self.footer_miss_count.add(1); + } + } +} + +struct StorageCacheObservedReader { + inner: Box, + cache_metrics: Arc, + counters: StorageCacheMetricCounters, +} + +fn observe_storage_cache_activity<'a, T>( + cache_metrics: Arc, + counters: StorageCacheMetricCounters, + before: StorageCacheMetricsSnapshot, + future: BoxFuture<'a, parquet::errors::Result>, +) -> BoxFuture<'a, parquet::errors::Result> +where + T: Send + 'a, +{ + async move { + let result = with_storage_cache_metrics(Arc::clone(&cache_metrics), future).await; + let delta = cache_metrics.snapshot().saturating_delta_since(before); + counters.record_range_delta(delta); + result + } + .boxed() +} + +fn observe_footer_cache_activity<'a>( + cache_metrics: Arc, + counters: StorageCacheMetricCounters, + before: StorageCacheMetricsSnapshot, + future: BoxFuture<'a, parquet::errors::Result>>, +) -> BoxFuture<'a, parquet::errors::Result>> { + async move { + let result = with_storage_cache_metrics(Arc::clone(&cache_metrics), future).await; + let delta = cache_metrics.snapshot().saturating_delta_since(before); + counters.record_range_delta(delta); + if result.is_ok() { + counters.record_footer_observation(delta); + } + result + } + .boxed() +} + +impl AsyncFileReader for StorageCacheObservedReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + let before = self.cache_metrics.snapshot(); + let cache_metrics = Arc::clone(&self.cache_metrics); + let counters = self.counters.clone(); + let future = self.inner.get_bytes(range); + observe_storage_cache_activity(cache_metrics, counters, before, future) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> + where + Self: Send, + { + let before = self.cache_metrics.snapshot(); + let cache_metrics = Arc::clone(&self.cache_metrics); + let counters = self.counters.clone(); + let future = self.inner.get_byte_ranges(ranges); + observe_storage_cache_activity(cache_metrics, counters, before, future) + } + + fn get_metadata<'a>( + &'a mut self, + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, parquet::errors::Result>> { + let before = self.cache_metrics.snapshot(); + let cache_metrics = Arc::clone(&self.cache_metrics); + let counters = self.counters.clone(); + let future = self.inner.get_metadata(options); + observe_footer_cache_activity(cache_metrics, counters, before, future) + } +} diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs b/quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs index 23b1411213e..ff2debfd50f 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs @@ -50,6 +50,7 @@ use regex_automata::dfa::{Automaton, dense}; use regex_automata::{Anchored, Input}; use tracing::debug; +use super::parquet_cache_metrics::instrument_parquet_file_reader_factory; use super::predicate; const METRICS_SORT_ORDER: &[&str] = &[ @@ -278,9 +279,8 @@ impl TableProvider for MetricsTableProvider { let object_store = state .runtime_env() .object_store(self.object_store_url.clone())?; - let reader_factory = Arc::new(CachedParquetFileReaderFactory::new( - object_store, - metadata_cache, + let reader_factory = instrument_parquet_file_reader_factory(Arc::new( + CachedParquetFileReaderFactory::new(object_store, metadata_cache), )); let parquet_source = ParquetSource::new(table_schema) .with_bloom_filter_on_read(true) diff --git a/quickwit/quickwit-datafusion/src/storage_bridge.rs b/quickwit/quickwit-datafusion/src/storage_bridge.rs index d1c9b6a096e..fb4ae564352 100644 --- a/quickwit/quickwit-datafusion/src/storage_bridge.rs +++ b/quickwit/quickwit-datafusion/src/storage_bridge.rs @@ -30,6 +30,8 @@ //! operations return `NotSupported` — DataFusion only reads parquet files //! through this store. +use std::ops::Range; +use std::path::{Path, PathBuf}; use std::sync::Arc; use async_trait::async_trait; @@ -43,7 +45,9 @@ use object_store::{ Result as ObjectStoreResult, }; use quickwit_common::uri::Uri; -use quickwit_storage::{Storage, StorageResolver}; +use quickwit_storage::{ + OwnedBytes, Storage, StorageCache, StorageResolver, wrap_storage_with_cache, +}; use tokio::sync::OnceCell; /// Adapts Quickwit's `Storage` trait to DataFusion's `ObjectStore` interface. @@ -54,6 +58,7 @@ use tokio::sync::OnceCell; pub struct QuickwitObjectStore { index_uri: Uri, storage_resolver: StorageResolver, + storage_cache: Option>, storage: OnceCell>, } @@ -62,27 +67,80 @@ impl QuickwitObjectStore { Self { index_uri, storage_resolver, + storage_cache: None, storage: OnceCell::new(), } } + pub fn with_storage_cache(mut self, storage_cache: Arc) -> Self { + self.storage_cache = Some(storage_cache); + self + } + /// Returns the handle to the underlying `Storage`, resolving it via the /// `StorageResolver` if this is the first call. async fn storage(&self) -> ObjectStoreResult<&Arc> { self.storage .get_or_try_init(|| async { - self.storage_resolver + let storage = self + .storage_resolver .resolve(&self.index_uri) .await .map_err(|err| object_store::Error::Generic { store: "QuickwitObjectStore", source: Box::new(err), - }) + })?; + Ok(match &self.storage_cache { + Some(cache) => wrap_storage_with_cache( + Arc::new(ScopedStorageCache::new( + self.index_uri.as_str().to_string(), + Arc::clone(cache), + )), + storage, + ), + None => storage, + }) }) .await } } +struct ScopedStorageCache { + scope: String, + inner: Arc, +} + +impl ScopedStorageCache { + fn new(scope: String, inner: Arc) -> Self { + Self { scope, inner } + } + + fn scoped_path(&self, path: &Path) -> PathBuf { + PathBuf::from(format!("{}#{}", self.scope, path.to_string_lossy())) + } +} + +#[async_trait] +impl StorageCache for ScopedStorageCache { + async fn get(&self, path: &Path, byte_range: Range) -> Option { + self.inner.get(&self.scoped_path(path), byte_range).await + } + + async fn get_all(&self, path: &Path) -> Option { + self.inner.get_all(&self.scoped_path(path)).await + } + + async fn put(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes) { + self.inner + .put(self.scoped_path(&path), byte_range, bytes) + .await + } + + async fn put_all(&self, path: PathBuf, bytes: OwnedBytes) { + self.inner.put_all(self.scoped_path(&path), bytes).await + } +} + impl std::fmt::Debug for QuickwitObjectStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("QuickwitObjectStore") @@ -264,3 +322,73 @@ impl ObjectStore for QuickwitObjectStore { }) } } + +#[cfg(test)] +mod tests { + use std::sync::Mutex; + + use super::*; + + #[derive(Default)] + struct RecordingStorageCache { + paths: Mutex>, + } + + impl RecordingStorageCache { + fn paths(&self) -> Vec { + self.paths.lock().unwrap().clone() + } + + fn record(&self, path: &Path) { + self.paths.lock().unwrap().push(path.to_path_buf()); + } + } + + #[async_trait] + impl StorageCache for RecordingStorageCache { + async fn get(&self, path: &Path, _byte_range: Range) -> Option { + self.record(path); + None + } + + async fn get_all(&self, path: &Path) -> Option { + self.record(path); + None + } + + async fn put(&self, path: PathBuf, _byte_range: Range, _bytes: OwnedBytes) { + self.record(&path); + } + + async fn put_all(&self, path: PathBuf, _bytes: OwnedBytes) { + self.record(&path); + } + } + + #[tokio::test] + async fn scoped_storage_cache_prefixes_cache_keys() { + let inner = Arc::new(RecordingStorageCache::default()); + let inner_cache: Arc = inner.clone(); + let cache = ScopedStorageCache::new("s3://metrics-bucket".to_string(), inner_cache); + + cache + .get(Path::new("indexes/metrics.parquet"), 10..20) + .await; + cache + .put( + Path::new("indexes/metrics.parquet").to_path_buf(), + 10..20, + OwnedBytes::new(&b"bytes"[..]), + ) + .await; + + let paths = inner.paths(); + assert_eq!( + paths, + vec![ + PathBuf::from("s3://metrics-bucket#indexes/metrics.parquet"), + PathBuf::from("s3://metrics-bucket#indexes/metrics.parquet"), + ] + ); + } +} diff --git a/quickwit/quickwit-serve/src/datafusion_api/setup.rs b/quickwit/quickwit-serve/src/datafusion_api/setup.rs index 211efe4b547..f824b2f245b 100644 --- a/quickwit/quickwit-serve/src/datafusion_api/setup.rs +++ b/quickwit/quickwit-serve/src/datafusion_api/setup.rs @@ -20,28 +20,33 @@ use std::collections::BTreeSet; use std::net::SocketAddr; +use std::ops::Range; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use anyhow::Context; +use async_trait::async_trait; use bytesize::ByteSize; use futures::{StreamExt, stream}; use quickwit_cluster::{ClusterChange, ClusterChangeStream, ClusterNode}; use quickwit_common::tower::Change; -use quickwit_config::NodeConfig; use quickwit_config::service::QuickwitService; +use quickwit_config::{CacheConfig, NodeConfig}; use quickwit_datafusion::grpc::DataFusionServiceGrpcImpl; use quickwit_datafusion::proto::data_fusion_service_server::{ DataFusionServiceServer, SERVICE_NAME as DATAFUSION_SERVICE_NAME, }; -use quickwit_datafusion::sources::metrics::MetricsDataSource; +use quickwit_datafusion::sources::metrics::{ + MetricsDataSource, instrument_parquet_range_cache_metrics, +}; use quickwit_datafusion::{ DataFusionService, DataFusionSessionBuilder, QuickwitObjectStoreRegistry, QuickwitWorkerResolver, build_worker, }; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_search::{SearchServiceClient, SearcherPool, create_search_client_from_grpc_addr}; -use quickwit_storage::StorageResolver; +use quickwit_storage::{MemorySizedCache, OwnedBytes, StorageCache, StorageResolver}; use tokio::time::timeout; use tonic::transport::server::Router; use tonic_reflection::pb::v1::ServerReflectionRequest; @@ -51,6 +56,10 @@ use tonic_reflection::pb::v1::server_reflection_response::MessageResponse; use crate::QuickwitServices; +const DATAFUSION_PARQUET_RANGE_CACHE_CAPACITY_ENV: &str = + "QW_DATAFUSION_PARQUET_RANGE_CACHE_CAPACITY"; +const FULL_SLICE: Range = 0..usize::MAX; + /// Build the generic DataFusion session builder for this node. /// /// Returns `None` if the searcher role is disabled or the endpoint toggle is @@ -84,7 +93,13 @@ pub(crate) fn build_datafusion_session_builder( ); let worker_resolver = QuickwitWorkerResolver::new(datafusion_worker_pool) .with_tls(node_config.grpc_config.tls.is_some()); - let registry = Arc::new(QuickwitObjectStoreRegistry::new(storage_resolver)); + let datafusion_parquet_range_cache = datafusion_parquet_range_cache_config(); + let datafusion_storage_cache = + DataFusionParquetRangeCache::from_config(&datafusion_parquet_range_cache); + let registry = Arc::new( + QuickwitObjectStoreRegistry::new(storage_resolver) + .with_storage_cache(Arc::new(datafusion_storage_cache)), + ); let builder = DataFusionSessionBuilder::new() .with_object_store_registry(registry) .context("failed to install DataFusion object store registry")? @@ -97,6 +112,49 @@ pub(crate) fn build_datafusion_session_builder( Ok(Some(Arc::new(builder))) } +fn datafusion_parquet_range_cache_config() -> CacheConfig { + let capacity = quickwit_common::get_from_env( + DATAFUSION_PARQUET_RANGE_CACHE_CAPACITY_ENV, + ByteSize::gb(4), + false, + ); + CacheConfig::default_with_capacity(capacity) +} + +struct DataFusionParquetRangeCache { + inner: MemorySizedCache, +} + +impl DataFusionParquetRangeCache { + fn from_config(cache_config: &CacheConfig) -> Self { + Self { + inner: MemorySizedCache::from_config( + cache_config, + &quickwit_storage::STORAGE_METRICS.datafusion_parquet_range_cache, + ), + } + } +} + +#[async_trait] +impl StorageCache for DataFusionParquetRangeCache { + async fn get(&self, path: &Path, byte_range: Range) -> Option { + self.inner.get_slice(path, byte_range) + } + + async fn get_all(&self, path: &Path) -> Option { + self.inner.get_slice(path, FULL_SLICE) + } + + async fn put(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes) { + self.inner.put_slice(path, byte_range, bytes); + } + + async fn put_all(&self, path: PathBuf, bytes: OwnedBytes) { + self.inner.put_slice(path, FULL_SLICE, bytes); + } +} + fn setup_datafusion_worker_pool( cluster_change_stream: ClusterChangeStream, max_message_size: ByteSize, @@ -209,7 +267,8 @@ impl DataFusionMount { .max_decoding_message_size(self.max_message_size_bytes) .max_encoding_message_size(self.max_message_size_bytes); - let worker = build_worker(session_builder); + let mut worker = build_worker(session_builder); + worker.add_on_plan_hook(instrument_parquet_range_cache_metrics); router .add_service(query_server) diff --git a/quickwit/quickwit-storage/src/cache/mod.rs b/quickwit/quickwit-storage/src/cache/mod.rs index f73a96b90f4..7b8d27d1e40 100644 --- a/quickwit/quickwit-storage/src/cache/mod.rs +++ b/quickwit/quickwit-storage/src/cache/mod.rs @@ -17,6 +17,7 @@ mod byte_range_cache; mod memory_sized_cache; mod quickwit_cache; mod slice_address; +mod storage_cache_metrics; mod storage_with_cache; mod stored_item; @@ -30,6 +31,10 @@ pub use storage_with_cache::StorageWithCache; pub use self::byte_range_cache::ByteRangeCache; pub use self::memory_sized_cache::MemorySizedCache; +pub use self::storage_cache_metrics::{ + StorageCacheMetrics, StorageCacheMetricsSnapshot, with_storage_cache_metrics, +}; +pub(crate) use self::storage_cache_metrics::{record_storage_cache_hit, record_storage_cache_miss}; use crate::{OwnedBytes, Storage}; /// Wraps the given directory with a slice cache that is actually global diff --git a/quickwit/quickwit-storage/src/cache/storage_cache_metrics.rs b/quickwit/quickwit-storage/src/cache/storage_cache_metrics.rs new file mode 100644 index 00000000000..9c98004110a --- /dev/null +++ b/quickwit/quickwit-storage/src/cache/storage_cache_metrics.rs @@ -0,0 +1,91 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +tokio::task_local! { + static STORAGE_CACHE_METRICS: Arc; +} + +/// Accumulates storage cache activity for the current observed operation. +#[derive(Debug, Default)] +pub struct StorageCacheMetrics { + hit_bytes: AtomicUsize, + miss_bytes: AtomicUsize, + hit_count: AtomicUsize, + miss_count: AtomicUsize, +} + +/// Point-in-time storage cache activity counters. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub struct StorageCacheMetricsSnapshot { + /// Number of bytes returned by the cache. + pub hit_bytes: usize, + /// Number of bytes fetched from the backing storage after cache misses. + pub miss_bytes: usize, + /// Number of cache reads that returned bytes. + pub hit_count: usize, + /// Number of cache reads that fell through to backing storage. + pub miss_count: usize, +} + +impl StorageCacheMetrics { + /// Returns the current cache activity counters. + pub fn snapshot(&self) -> StorageCacheMetricsSnapshot { + StorageCacheMetricsSnapshot { + hit_bytes: self.hit_bytes.load(Ordering::Relaxed), + miss_bytes: self.miss_bytes.load(Ordering::Relaxed), + hit_count: self.hit_count.load(Ordering::Relaxed), + miss_count: self.miss_count.load(Ordering::Relaxed), + } + } + + fn record_hit(&self, num_bytes: usize) { + self.hit_count.fetch_add(1, Ordering::Relaxed); + self.hit_bytes.fetch_add(num_bytes, Ordering::Relaxed); + } + + fn record_miss(&self, num_bytes: usize) { + self.miss_count.fetch_add(1, Ordering::Relaxed); + self.miss_bytes.fetch_add(num_bytes, Ordering::Relaxed); + } +} + +impl StorageCacheMetricsSnapshot { + /// Returns the non-negative delta between this snapshot and an earlier one. + pub fn saturating_delta_since(self, before: Self) -> Self { + Self { + hit_bytes: self.hit_bytes.saturating_sub(before.hit_bytes), + miss_bytes: self.miss_bytes.saturating_sub(before.miss_bytes), + hit_count: self.hit_count.saturating_sub(before.hit_count), + miss_count: self.miss_count.saturating_sub(before.miss_count), + } + } +} + +/// Runs `future` with storage cache activity recorded into `metrics`. +pub async fn with_storage_cache_metrics(metrics: Arc, future: F) -> T +where F: Future { + STORAGE_CACHE_METRICS.scope(metrics, future).await +} + +pub(crate) fn record_storage_cache_hit(num_bytes: usize) { + let _ = STORAGE_CACHE_METRICS.try_with(|metrics| metrics.record_hit(num_bytes)); +} + +pub(crate) fn record_storage_cache_miss(num_bytes: usize) { + let _ = STORAGE_CACHE_METRICS.try_with(|metrics| metrics.record_miss(num_bytes)); +} diff --git a/quickwit/quickwit-storage/src/cache/storage_with_cache.rs b/quickwit/quickwit-storage/src/cache/storage_with_cache.rs index 79c5f215602..0a1c8b0f32f 100644 --- a/quickwit/quickwit-storage/src/cache/storage_with_cache.rs +++ b/quickwit/quickwit-storage/src/cache/storage_with_cache.rs @@ -21,7 +21,7 @@ use async_trait::async_trait; use quickwit_common::uri::Uri; use tokio::io::AsyncRead; -use crate::cache::StorageCache; +use crate::cache::{StorageCache, record_storage_cache_hit, record_storage_cache_miss}; use crate::storage::SendableAsync; use crate::{BulkDeleteError, OwnedBytes, Storage, StorageResult}; @@ -57,9 +57,11 @@ impl Storage for StorageWithCache { async fn get_slice(&self, path: &Path, byte_range: Range) -> StorageResult { if let Some(bytes) = self.cache.get(path, byte_range.clone()).await { + record_storage_cache_hit(bytes.len()); Ok(bytes) } else { let bytes = self.storage.get_slice(path, byte_range.clone()).await?; + record_storage_cache_miss(bytes.len()); self.cache .put(path.to_owned(), byte_range, bytes.clone()) .await; @@ -80,9 +82,11 @@ impl Storage for StorageWithCache { async fn get_all(&self, path: &Path) -> StorageResult { if let Some(bytes) = self.cache.get_all(path).await { + record_storage_cache_hit(bytes.len()); Ok(bytes) } else { let bytes = self.storage.get_all(path).await?; + record_storage_cache_miss(bytes.len()); self.cache.put_all(path.to_owned(), bytes.clone()).await; Ok(bytes) } @@ -116,7 +120,10 @@ mod tests { use std::sync::Mutex; use super::*; - use crate::{MockStorage, MockStorageCache, OwnedBytes}; + use crate::{ + MockStorage, MockStorageCache, OwnedBytes, StorageCacheMetrics, + StorageCacheMetricsSnapshot, with_storage_cache_metrics, + }; #[tokio::test] async fn put_in_cache_test() { @@ -159,4 +166,58 @@ mod tests { .unwrap(); assert_eq!(data1, data2); } + + #[tokio::test] + async fn storage_cache_metrics_record_hits_and_misses() { + let mut mock_storage = MockStorage::default(); + let mut mock_cache = MockStorageCache::default(); + let actual_cache: Arc>> = + Arc::new(Mutex::new(HashMap::new())); + + let cache1 = actual_cache.clone(); + mock_cache + .expect_get_all() + .times(2) + .returning(move |path| cache1.lock().unwrap().get(path).cloned()); + mock_cache + .expect_put_all() + .times(1) + .returning(move |path, data| { + let actual_cache = actual_cache.clone(); + actual_cache.lock().unwrap().insert(path, data); + }); + + mock_storage + .expect_get_all() + .times(1) + .returning(|_path| Ok(OwnedBytes::new(vec![1, 2, 3]))); + + let storage_with_cache = StorageWithCache { + storage: Arc::new(mock_storage), + cache: Arc::new(mock_cache), + }; + let metrics = Arc::new(StorageCacheMetrics::default()); + + with_storage_cache_metrics(Arc::clone(&metrics), async { + storage_with_cache + .get_all(Path::new("cool_file")) + .await + .unwrap(); + storage_with_cache + .get_all(Path::new("cool_file")) + .await + .unwrap(); + }) + .await; + + assert_eq!( + metrics.snapshot(), + StorageCacheMetricsSnapshot { + hit_bytes: 3, + miss_bytes: 3, + hit_count: 1, + miss_count: 1, + } + ); + } } diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs index c21ed2a0bf3..680c06fd3f3 100644 --- a/quickwit/quickwit-storage/src/lib.rs +++ b/quickwit/quickwit-storage/src/lib.rs @@ -64,7 +64,8 @@ pub use self::bundle_storage::{BundleStorage, BundleStorageFileOffsets}; #[cfg(any(test, feature = "testsuite"))] pub use self::cache::MockStorageCache; pub use self::cache::{ - ByteRangeCache, MemorySizedCache, QuickwitCache, StorageCache, wrap_storage_with_cache, + ByteRangeCache, MemorySizedCache, QuickwitCache, StorageCache, StorageCacheMetrics, + StorageCacheMetricsSnapshot, with_storage_cache_metrics, wrap_storage_with_cache, }; pub use self::local_file_storage::{LocalFileStorage, LocalFileStorageFactory}; #[cfg(feature = "azure")] diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 888d137cc18..410ed411149 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -30,6 +30,7 @@ pub struct StorageMetrics { pub predicate_cache: CacheMetrics, pub fd_cache_metrics: CacheMetrics, pub fast_field_cache: CacheMetrics, + pub datafusion_parquet_range_cache: CacheMetrics, pub split_footer_cache: CacheMetrics, pub searcher_split_cache: CacheMetrics, pub get_slice_timeout_successes: [IntCounter; 3], @@ -94,6 +95,7 @@ impl Default for StorageMetrics { StorageMetrics { fast_field_cache: CacheMetrics::for_component("fastfields"), + datafusion_parquet_range_cache: CacheMetrics::for_component("datafusion_parquet_range"), fd_cache_metrics: CacheMetrics::for_component("fd"), partial_request_cache: CacheMetrics::for_component("partial_request"), predicate_cache: CacheMetrics::for_component("predicate"),