diff --git a/quickwit/quickwit-jaeger/src/lib.rs b/quickwit/quickwit-jaeger/src/lib.rs index 1b6dfc27d0c..e053f048cb9 100644 --- a/quickwit/quickwit-jaeger/src/lib.rs +++ b/quickwit/quickwit-jaeger/src/lib.rs @@ -2723,6 +2723,7 @@ mod tests { scroll_id: None, failed_splits: Vec::new(), num_successful_splits: 1, + resource_stats: None, }) }); diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index a49b951b3ca..c5066022db6 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -319,6 +319,11 @@ message SearchResponse { // Total number of successful splits searched. uint64 num_successful_splits = 8; + + // Per-request execution telemetry. See `RootResourceStats` for the + // execution model and a diagnostic playbook. Optional for backwards + // compatibility — older servers / clients may omit this. + optional RootResourceStats resource_stats = 10; } message SearchPlanResponse { @@ -355,12 +360,282 @@ message LeafSearchRequest { repeated string index_uris = 9; } +// Per-leaf execution stats for a single leaf search response. +// +// All fields are cheap to compute (counters and `Instant::now()` calls +// already on the hot path). Aggregated by the root into `RootResourceStats` +// — see that message for the request-scoped view and the diagnostic +// playbook. message ResourceStats { + // Bytes pulled into the per-request short-lived cache during warmup. + // Proxy for "bytes downloaded from object storage" by this leaf. + // + // The proxy is inaccurate because of some of the data could have been in cache. uint64 short_lived_cache_num_bytes = 1; + // Total number of documents in the splits the leaf considered for search. + // (after timestamp/tag pruning). uint64 split_num_docs = 2; + // Wall time spent in the warmup phase (downloading metadata + hot bytes). uint64 warmup_microsecs = 3; + // Time spent queued waiting for a permit on the search CPU thread pool. + // Reflects pool saturation, not the cost of the query itself. uint64 cpu_thread_pool_wait_microsecs = 4; + // CPU time spent on the search CPU thread pool — matching, scoring, + // top-k collection, and aggregation collection combined. uint64 cpu_microsecs = 5; + // CPU time spent specifically inside the aggregation collector, + // measured around `AggregationSegmentCollector::collect_block` calls. + // Zero when the request has no aggregation. + uint64 aggregation_cpu_microsecs = 6; + // Wall clock time for the entire leaf request, measured from the + // entry of `multi_index_leaf_search` to the moment the merged + // response is ready to return. Includes I/O permit waits, warmup + // (parallel across splits), CPU permit queueing, search execution, + // and the per-leaf merge. + // + // This is set ONCE per leaf request — the per-split path does not + // populate it, so `merge_resource_stats` summing across per-split + // responses leaves it at zero, and the leaf-level handler then + // overwrites with the actual wall time. + // + // The root takes the max and sum of this field across leaves to + // populate `RootResourceStats.max_leaf_wall_microsecs` and + // `sum_leaf_wall_microsecs` — i.e. the slowest leaf and the basis + // for an imbalance ratio. + uint64 wall_microsecs = 7; +} + +// ============================================================================= +// RootResourceStats — execution telemetry for a single search request. +// ============================================================================= +// +// Aggregate of cheap-to-compute counters and timers, summed/maxed across all +// leaf responses participating in a search, plus a few root-side measurements +// (merge time, leaf count). The intent is to give an operator (or an AI agent) +// enough signal to attribute slowness to a specific stage of the search +// pipeline and to estimate whether the implementation has headroom. +// +// This is the request-scoped sibling of the per-leaf `ResourceStats`. Where +// `ResourceStats` describes one leaf's view of one or more splits, +// `RootResourceStats` describes the whole request as observed from the root. +// +// ----------------------------------------------------------------------------- +// Quickwit search execution model (one request, end-to-end) +// ----------------------------------------------------------------------------- +// +// A search request flows through the following stages. Each stage has at +// least one field below that exposes its cost. +// +// 1. Root planning. The root server resolves the index metadata, picks the +// splits to search (timestamp/tag pruning), groups them by leaf, and +// issues a `LeafSearchRequest` to every leaf. The cost of this phase is +// *not* covered by RootResourceStats today (typically negligible). +// +// 2. Leaf-side, per-split execution. Each leaf processes its assigned +// splits concurrently. Per split the lifecycle is: +// a. Acquire an I/O permit (admission control on concurrent +// downloads). +// b. WARMUP — download all required data from object storage (split +// metadata, posting list headers, term dictionary blocks needed +// by the query, fast-field headers). These bytes land in a +// per-request "short-lived cache" so subsequent reads are local. +// c. Acquire a CPU permit on the search CPU thread pool. Until a +// permit is granted the task is *queued* — this queueing time is +// recorded as `cpu_thread_pool_wait_microsecs`. +// d. Execute the query on the search CPU thread pool: matching/scoring, +// top-k collection, and (if requested) aggregation. CPU time +// spent here is `cpu_microsecs`. +// Time spent specifically inside the aggregation collector is +// `aggregation_cpu_microsecs`. +// The thread pool is sized to match the number of hyperthreads +// available, so that this is very close to the actual CPU usage. +// e. Return a partial result. +// +// 3. Leaf-side merge. Each leaf merges its per-split partial results +// into a single `LeafSearchResponse` (top-k merge, aggregation tree +// merge). +// +// 4. Root merge. The root collects every `LeafSearchResponse` and merges +// them into the final `SearchResponse`. The wall time of this stage +// is `root_merge_microsecs`. +// +// ----------------------------------------------------------------------------- +// Interpreting these fields — diagnostic playbook +// ----------------------------------------------------------------------------- +// +// Let: +// wall := elapsed_time_micros (top-level field on SearchResponse) +// cpu := total_cpu_microsecs +// wait := total_cpu_thread_pool_wait_microsecs +// warmup := total_warmup_microsecs +// agg_cpu := total_aggregation_cpu_microsecs +// max_leaf := max_leaf_wall_microsecs +// avg_leaf := sum_leaf_wall_microsecs / num_leaf_responses +// downloaded := total_short_lived_cache_num_bytes +// docs_in_split := total_split_num_docs +// docs_matched := total_num_docs_matched +// +// Suggested rules of thumb (orient triage, not strict thresholds): +// +// * CPU-bound query: cpu >~ 0.5 * (wall * num_search_cpu_threads) +// and warmup small relative to wall. +// => look at the query plan; check for +// unselective predicates, range queries on +// tokenized fields, phrase queries with +// frequent terms. +// +// * I/O-bound query: warmup >~ 0.5 * wall, or `downloaded` is large +// (hundreds of MB / GBs). +// => cold cache, large fast fields, many splits, +// or first-time access patterns. Check whether +// a different fast-field layout or a smaller +// time range would help. +// +// * Cluster overloaded: wait >~ cpu (waiting longer than computing). +// => the search CPU pool is saturated by other +// requests; the query itself may be fine. +// Re-run in isolation to confirm. +// +// * Aggregation-heavy: agg_cpu >~ 0.5 * cpu. +// => aggregation is the bottleneck, not matching. +// Look at bucket cardinality, sub-aggregations, +// or unbounded `terms` aggregations. +// +// * Imbalance across leaves: (max_leaf - avg_leaf) / avg_leaf >~ 1. +// => request fan-out is uneven. Common causes: +// one leaf has many more splits, one leaf has +// a colder cache, or other workload is +// starving one leaf's CPU pool. +// +// * Too much work: docs_matched >~ docs_in_split * 0.5. +// => query is barely selective; consider tighter +// filters, time pruning, or sample-based +// approaches. +// +// * Headroom estimate: wall * num_search_cpu_threads - cpu - wait +// ~ idle CPU capacity unused by this request. +// If positive and large, parallelism is the +// limiting factor (more splits, more leaves). +// If near zero, we are at compute saturation. +// +// All values are sums/maxes over leaf responses unless noted. They are +// designed to be cheap (counters and `Instant::now()` calls already in the +// hot path) so they can be returned with every search response. +// ----------------------------------------------------------------------------- +message RootResourceStats { + // ------------------------------------------------------------------------- + // Sums across leaf responses + // ------------------------------------------------------------------------- + + // Sum of `cpu_microsecs` reported by every leaf — total wall time + // spent on the search CPU thread pool, summed across all leaves. + // + // Compare against `wall * num_search_cpu_threads_per_leaf * + // num_leaf_responses` to estimate CPU utilization. A value close to + // that upper bound means the request was compute-saturated. + // + // Includes time spent matching, scoring, top-k collection, and + // aggregation collection. Does NOT include warmup or queueing. + uint64 total_cpu_microsecs = 1; + + // Sum of `warmup_microsecs` across leaves. + // + // Warmup is the phase where the leaf downloads, from object storage, + // the bytes it needs to evaluate the query (split metadata, posting + // list / dictionary blocks, fast-field headers). It runs before the + // query is dispatched to the search CPU pool. + // + // High values relative to `total_cpu_microsecs` signal an I/O-bound + // request — cold short-lived cache, large fast fields, high split + // count, or cross-region object storage latency. + uint64 total_warmup_microsecs = 2; + + // Sum of `cpu_thread_pool_wait_microsecs` across leaves. + // + // Time leaves spent *queued*, waiting for a permit on the search CPU + // thread pool after warmup completed. This is admission-control + // queueing, not work. + // + // If this is comparable to or larger than `total_cpu_microsecs`, the + // cluster is CPU-saturated by other workloads — the query itself may + // be fine. Re-run in isolation to confirm. + uint64 total_cpu_thread_pool_wait_microsecs = 3; + + // CPU time spent specifically inside the aggregation collector, + // summed across leaves. Zero when the request has no aggregation. + // + // Measured by sampling `Instant::now()` around each call to + // `AggregationSegmentCollector::collect_block` (block size <= 2048 + // docs). Overhead is well below noise (~0.15 ns per matched doc). + // + // Compare to `total_cpu_microsecs` to get the fraction of CPU spent + // in aggregation. A high fraction (>~ 50%) points at heavy + // bucketization (terms with high cardinality, deep sub-aggregations) + // rather than at predicate matching. + uint64 total_aggregation_cpu_microsecs = 4; + + // Sum of `short_lived_cache_num_bytes` across leaves — the size of + // the per-request hot-byte cache populated during warmup. A reasonable + // proxy for "GB downloaded from object storage for this query". + // + // Useful to flag I/O-heavy queries even when `total_warmup_microsecs` + // is hidden by parallelism (many splits, each downloading concurrently). + uint64 total_short_lived_cache_num_bytes = 5; + + // Sum of `split_num_docs` across leaves — the total number of + // documents in all splits searched, after timestamp/tag pruning at + // the root and any further pruning at the leaf. + // + // This is the size of the haystack the query had to traverse. Pair + // with `total_num_docs_matched` to gauge selectivity. + uint64 total_split_num_docs = 9; + + // Sum across leaves of the number of docs that matched the query + // (i.e. were passed to the collector via `collect_block` / + // `collect`). For non-aggregation queries this equals the global + // `num_hits` on `SearchResponse`; for aggregation queries it is the + // raw match count, which `num_hits` may not surface. + // + // Selectivity = `total_num_docs_matched / total_split_num_docs`. + // A value near 1 means the query is barely filtering and the cost + // is dominated by traversal of essentially every doc. + uint64 total_num_docs_matched = 10; + + // ------------------------------------------------------------------------- + // Aggregates over leaves (for imbalance detection) + // ------------------------------------------------------------------------- + + // Number of leaf responses folded into this stats object. Useful as + // the denominator when computing per-leaf averages from the sums above. + uint64 num_leaf_responses = 6; + + // Wall time of the slowest leaf. This is a tight lower bound on the + // request's wall time (modulo root merge) because the root must wait + // for every leaf. + // + // If `max_leaf_wall_microsecs ~= elapsed_time_micros`, optimization + // effort should focus on the slowest leaf, not on parallelism. + uint64 max_leaf_wall_microsecs = 7; + + // Sum of leaf wall times across all responding leaves. Combined with + // `num_leaf_responses` and `max_leaf_wall_microsecs`, lets the caller + // derive the imbalance ratio: + // avg = sum_leaf_wall_microsecs / num_leaf_responses + // imbalance = (max_leaf_wall_microsecs - avg) / avg + // A high imbalance indicates skew — uneven split assignment, cold + // cache on one leaf, or competing workload pinning a single leaf's + // CPU pool. + uint64 sum_leaf_wall_microsecs = 8; + + // ------------------------------------------------------------------------- + // Root-side instrumentation + // ------------------------------------------------------------------------- + + // Wall time spent in the root's merge step — combining leaf top-k + // results and intermediate aggregation results into the final + // `SearchResponse`. Typically small for plain searches; non-trivial + // for heavy aggregations (large bucket trees, deep sub-aggregations). + uint64 root_merge_microsecs = 11; } // LeafRequestRef references data in LeafSearchRequest to deduplicate data. diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 16c11358ab8..0839e71ec8d 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -240,6 +240,11 @@ pub struct SearchResponse { /// Total number of successful splits searched. #[prost(uint64, tag = "8")] pub num_successful_splits: u64, + /// Per-request execution telemetry. See `RootResourceStats` for the + /// execution model and a diagnostic playbook. Optional for backwards + /// compatibility — older servers / clients may omit this. + #[prost(message, optional, tag = "10")] + pub resource_stats: ::core::option::Option, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] @@ -281,19 +286,273 @@ pub struct LeafSearchRequest { #[prost(string, repeated, tag = "9")] pub index_uris: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } +/// Per-leaf execution stats for a single leaf search response. +/// +/// All fields are cheap to compute (counters and `Instant::now()` calls +/// already on the hot path). Aggregated by the root into `RootResourceStats` +/// — see that message for the request-scoped view and the diagnostic +/// playbook. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct ResourceStats { + /// Bytes pulled into the per-request short-lived cache during warmup. + /// Proxy for "bytes downloaded from object storage" by this leaf. #[prost(uint64, tag = "1")] pub short_lived_cache_num_bytes: u64, + /// Total number of documents in the splits the leaf actually opened + /// (after timestamp/tag pruning). #[prost(uint64, tag = "2")] pub split_num_docs: u64, + /// Wall time spent in the warmup phase (downloading metadata + hot bytes). #[prost(uint64, tag = "3")] pub warmup_microsecs: u64, + /// Time spent queued waiting for a permit on the search CPU thread pool. + /// Reflects pool saturation, not the cost of the query itself. #[prost(uint64, tag = "4")] pub cpu_thread_pool_wait_microsecs: u64, + /// CPU time spent on the search CPU thread pool — matching, scoring, + /// top-k collection, and aggregation collection combined. #[prost(uint64, tag = "5")] pub cpu_microsecs: u64, + /// CPU time spent specifically inside the aggregation collector, + /// measured around `AggregationSegmentCollector::collect_block` calls. + /// Zero when the request has no aggregation. + #[prost(uint64, tag = "6")] + pub aggregation_cpu_microsecs: u64, + /// Per-split wall time (entry to exit of `leaf_search_single_split`) + /// summed across all splits a single leaf processed for this request. + /// + /// This is *total work-time* on the leaf, not the leaf's wall clock — + /// splits run concurrently so the leaf's actual wall time is typically + /// less than the sum. Still a good imbalance signal: a leaf whose + /// total work-time dwarfs others is the bottleneck candidate. + /// + /// The root takes the max and sum of this field across leaves to + /// populate `RootResourceStats.max_leaf_wall_microsecs` and + /// `sum_leaf_wall_microsecs`. + #[prost(uint64, tag = "7")] + pub wall_microsecs: u64, +} +/// # ============================================================================= +/// RootResourceStats — execution telemetry for a single search request. +/// +/// Aggregate of cheap-to-compute counters and timers, summed/maxed across all +/// leaf responses participating in a search, plus a few root-side measurements +/// (merge time, leaf count). The intent is to give an operator (or an AI agent) +/// enough signal to attribute slowness to a specific stage of the search +/// pipeline and to estimate whether the implementation has headroom. +/// +/// This is the request-scoped sibling of the per-leaf `ResourceStats`. Where +/// `ResourceStats` describes one leaf's view of one or more splits, +/// `RootResourceStats` describes the whole request as observed from the root. +/// +/// --- +/// +/// ## Quickwit search execution model (one request, end-to-end) +/// +/// A search request flows through the following stages. Each stage has at +/// least one field below that exposes its cost. +/// +/// 1. Root planning. The root server resolves the index metadata, picks the +/// splits to search (timestamp/tag pruning), groups them by leaf, and +/// issues a `LeafSearchRequest` to every leaf. The cost of this phase is +/// *not* covered by RootResourceStats today (typically negligible). +/// +/// 1. Leaf-side, per-split execution. Each leaf processes its assigned +/// splits concurrently. Per split the lifecycle is: +/// a. Acquire an I/O permit (admission control on concurrent +/// downloads). +/// b. WARMUP — download "hot" bytes from object storage (split +/// metadata, posting list headers, term dictionary blocks needed +/// by the query, fast-field headers). These bytes land in a +/// per-request "short-lived cache" so subsequent reads are local. +/// c. Acquire a CPU permit on the search CPU thread pool. Until a +/// permit is granted the task is *queued* — this queueing time is +/// recorded as `cpu_thread_pool_wait_microsecs`. +/// d. Execute the query on the CPU thread pool: matching/scoring, +/// top-k collection, and (if requested) aggregation. CPU time +/// spent here is `cpu_microsecs`. Time spent specifically inside +/// the aggregation collector is `aggregation_cpu_microsecs`. +/// e. Return a partial result. +/// +/// 1. Leaf-side merge. Each leaf merges its per-split partial results +/// into a single `LeafSearchResponse` (top-k merge, aggregation tree +/// merge). +/// +/// 1. Root merge. The root collects every `LeafSearchResponse` and merges +/// them into the final `SearchResponse`. The wall time of this stage +/// is `root_merge_microsecs`. +/// +/// --- +/// +/// ## Interpreting these fields — diagnostic playbook +/// +/// Let: +/// wall := elapsed_time_micros (top-level field on SearchResponse) +/// cpu := total_cpu_microsecs +/// wait := total_cpu_thread_pool_wait_microsecs +/// warmup := total_warmup_microsecs +/// agg_cpu := total_aggregation_cpu_microsecs +/// max_leaf := max_leaf_wall_microsecs +/// avg_leaf := sum_leaf_wall_microsecs / num_leaf_responses +/// downloaded := total_short_lived_cache_num_bytes +/// docs_in_split := total_split_num_docs +/// docs_matched := total_num_docs_matched +/// +/// Suggested rules of thumb (orient triage, not strict thresholds): +/// +/// * CPU-bound query: cpu >~ 0.5 * (wall * num_search_cpu_threads) +/// and warmup small relative to wall. +/// => look at the query plan; check for +/// unselective predicates, range queries on +/// tokenized fields, phrase queries with +/// frequent terms. +/// +/// * I/O-bound query: warmup >~ 0.5 * wall, or `downloaded` is large +/// (hundreds of MB / GBs). +/// => cold cache, large fast fields, many splits, +/// or first-time access patterns. Check whether +/// a different fast-field layout or a smaller +/// time range would help. +/// +/// * Cluster overloaded: wait >~ cpu (waiting longer than computing). +/// => the search CPU pool is saturated by other +/// requests; the query itself may be fine. +/// Re-run in isolation to confirm. +/// +/// * Aggregation-heavy: agg_cpu >~ 0.5 * cpu. +/// => aggregation is the bottleneck, not matching. +/// Look at bucket cardinality, sub-aggregations, +/// or unbounded `terms` aggregations. +/// +/// * Imbalance across leaves: (max_leaf - avg_leaf) / avg_leaf >~ 1. +/// => request fan-out is uneven. Common causes: +/// one leaf has many more splits, one leaf has +/// a colder cache, or other workload is +/// starving one leaf's CPU pool. +/// +/// * Too much work: docs_matched >~ docs_in_split * 0.5. +/// => query is barely selective; consider tighter +/// filters, time pruning, or sample-based +/// approaches. +/// +/// * Headroom estimate: wall * num_search_cpu_threads - cpu - wait +/// ~ idle CPU capacity unused by this request. +/// If positive and large, parallelism is the +/// limiting factor (more splits, more leaves). +/// If near zero, we are at compute saturation. +/// +/// ## All values are sums/maxes over leaf responses unless noted. They are +/// designed to be cheap (counters and `Instant::now()` calls already in the +/// hot path) so they can be returned with every search response. +/// +/// --- +/// +/// ## Sums across leaf responses +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct RootResourceStats { + /// Sum of `cpu_microsecs` reported by every leaf — total wall time + /// spent on the search CPU thread pool, summed across all leaves. + /// + /// Compare against `wall * num_search_cpu_threads_per_leaf * num_leaf_responses` to estimate CPU utilization. A value close to + /// that upper bound means the request was compute-saturated. + /// + /// Includes time spent matching, scoring, top-k collection, and + /// aggregation collection. Does NOT include warmup or queueing. + #[prost(uint64, tag = "1")] + pub total_cpu_microsecs: u64, + /// Sum of `warmup_microsecs` across leaves. + /// + /// Warmup is the phase where the leaf downloads, from object storage, + /// the bytes it needs to evaluate the query (split metadata, posting + /// list / dictionary blocks, fast-field headers). It runs before the + /// query is dispatched to the search CPU pool. + /// + /// High values relative to `total_cpu_microsecs` signal an I/O-bound + /// request — cold short-lived cache, large fast fields, high split + /// count, or cross-region object storage latency. + #[prost(uint64, tag = "2")] + pub total_warmup_microsecs: u64, + /// Sum of `cpu_thread_pool_wait_microsecs` across leaves. + /// + /// Time leaves spent *queued*, waiting for a permit on the search CPU + /// thread pool after warmup completed. This is admission-control + /// queueing, not work. + /// + /// If this is comparable to or larger than `total_cpu_microsecs`, the + /// cluster is CPU-saturated by other workloads — the query itself may + /// be fine. Re-run in isolation to confirm. + #[prost(uint64, tag = "3")] + pub total_cpu_thread_pool_wait_microsecs: u64, + /// CPU time spent specifically inside the aggregation collector, + /// summed across leaves. Zero when the request has no aggregation. + /// + /// Measured by sampling `Instant::now()` around each call to + /// `AggregationSegmentCollector::collect_block` (block size \<= 2048 + /// docs). Overhead is well below noise (~0.15 ns per matched doc). + /// + /// Compare to `total_cpu_microsecs` to get the fraction of CPU spent + /// in aggregation. A high fraction (>~ 50%) points at heavy + /// bucketization (terms with high cardinality, deep sub-aggregations) + /// rather than at predicate matching. + #[prost(uint64, tag = "4")] + pub total_aggregation_cpu_microsecs: u64, + /// Sum of `short_lived_cache_num_bytes` across leaves — the size of + /// the per-request hot-byte cache populated during warmup. A reasonable + /// proxy for "GB downloaded from object storage for this query". + /// + /// Useful to flag I/O-heavy queries even when `total_warmup_microsecs` + /// is hidden by parallelism (many splits, each downloading concurrently). + #[prost(uint64, tag = "5")] + pub total_short_lived_cache_num_bytes: u64, + /// Sum of `split_num_docs` across leaves — the total number of + /// documents in all splits searched, after timestamp/tag pruning at + /// the root and any further pruning at the leaf. + /// + /// This is the size of the haystack the query had to traverse. Pair + /// with `total_num_docs_matched` to gauge selectivity. + #[prost(uint64, tag = "9")] + pub total_split_num_docs: u64, + /// Sum across leaves of the number of docs that matched the query + /// (i.e. were passed to the collector via `collect_block` / + /// `collect`). For non-aggregation queries this equals the global + /// `num_hits` on `SearchResponse`; for aggregation queries it is the + /// raw match count, which `num_hits` may not surface. + /// + /// Selectivity = `total_num_docs_matched / total_split_num_docs`. + /// A value near 1 means the query is barely filtering and the cost + /// is dominated by traversal of essentially every doc. + #[prost(uint64, tag = "10")] + pub total_num_docs_matched: u64, + /// Number of leaf responses folded into this stats object. Useful as + /// the denominator when computing per-leaf averages from the sums above. + #[prost(uint64, tag = "6")] + pub num_leaf_responses: u64, + /// Wall time of the slowest leaf. This is a tight lower bound on the + /// request's wall time (modulo root merge) because the root must wait + /// for every leaf. + /// + /// If `max_leaf_wall_microsecs ~= elapsed_time_micros`, optimization + /// effort should focus on the slowest leaf, not on parallelism. + #[prost(uint64, tag = "7")] + pub max_leaf_wall_microsecs: u64, + /// Sum of leaf wall times across all responding leaves. Combined with + /// `num_leaf_responses` and `max_leaf_wall_microsecs`, lets the caller + /// derive the imbalance ratio: + /// avg = sum_leaf_wall_microsecs / num_leaf_responses + /// imbalance = (max_leaf_wall_microsecs - avg) / avg + /// A high imbalance indicates skew — uneven split assignment, cold + /// cache on one leaf, or competing workload pinning a single leaf's + /// CPU pool. + #[prost(uint64, tag = "8")] + pub sum_leaf_wall_microsecs: u64, + /// Wall time spent in the root's merge step — combining leaf top-k + /// results and intermediate aggregation results into the final + /// `SearchResponse`. Typically small for plain searches; non-trivial + /// for heavy aggregations (large bucket trees, deep sub-aggregations). + #[prost(uint64, tag = "11")] + pub root_merge_microsecs: u64, } /// LeafRequestRef references data in LeafSearchRequest to deduplicate data. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index ed21fd968ba..18a071246cb 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -15,6 +15,7 @@ use std::borrow::Cow; use std::cmp::Ordering; use std::collections::HashSet; +use std::time::Instant; use itertools::Itertools; use quickwit_common::binary_heap::{SortKeyMapper, TopK}; @@ -477,6 +478,16 @@ pub struct QuickwitSegmentCollector { segment_top_k_collector: Option>, aggregation: Option, num_hits: u64, + /// Cumulative wall time spent inside the aggregation segment collector, + /// measured around `collect_block` / `collect` calls. Tracked in + /// nanoseconds because per-block work (typically <= 2048 docs) often + /// completes in tens of microseconds, and we want sub-microsecond + /// accuracy. The value is converted to microseconds when surfaced + /// in `ResourceStats.aggregation_cpu_microsecs`. + /// + /// Cost of the timing itself: one `Instant::now()` pair per block, + /// i.e. ~2 * 150 ns / 2048 docs ~ 0.15 ns/doc — well below noise. + aggregation_cpu_nanos: u64, } #[derive(Copy, Clone, Debug)] @@ -533,32 +544,53 @@ impl SegmentCollector for QuickwitSegmentCollector { segment_top_k_collector.collect_top_k_block(filtered_docs); } - match self.aggregation.as_mut() { - Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector)) => { - collector.collect_block(filtered_docs) - } - Some(AggregationSegmentCollectors::TantivyAggregationSegmentCollector(collector)) => { - collector.collect_block(filtered_docs) + // Time spent inside the aggregation collector is tracked separately + // so that `RootResourceStats.total_aggregation_cpu_microsecs` can be + // compared against `total_cpu_microsecs` to identify aggregation-heavy + // queries. The hot path is `collect_block` (block size <= 2048). + if let Some(aggregation) = self.aggregation.as_mut() { + let agg_start = Instant::now(); + match aggregation { + AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector) => { + collector.collect_block(filtered_docs); + } + AggregationSegmentCollectors::TantivyAggregationSegmentCollector(collector) => { + collector.collect_block(filtered_docs); + } } - None => (), + self.aggregation_cpu_nanos += agg_start.elapsed().as_nanos() as u64; } } #[inline] fn collect(&mut self, doc_id: DocId, score: Score) { + // Tantivy's dispatch (`Collector::collect_segment` default impl) + // takes this per-doc path whenever scoring is required (e.g. + // sort by `_score`, BM25 ranking) or the segment has deleted + // docs — see `tantivy/src/collector/mod.rs`. It is therefore + // exercised by any score-sorted Quickwit query. + // + // We deliberately do NOT time the aggregation here: at ~300 ns + // per `Instant::now()` pair amortized over a single doc, the + // overhead would dwarf the work we're trying to measure. The + // `collect_block` hook is enough to characterize aggregation + // cost in the common (non-scored aggregation) case; for scored + // aggregation queries `aggregation_cpu_microsecs` will be a + // slight underestimate. self.num_hits += 1; if let Some(segment_top_k_collector) = self.segment_top_k_collector.as_mut() { segment_top_k_collector.collect_top_k(doc_id, score); } - match self.aggregation.as_mut() { - Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector)) => { - collector.collect(doc_id, score) - } - Some(AggregationSegmentCollectors::TantivyAggregationSegmentCollector(collector)) => { - collector.collect(doc_id, score) + if let Some(aggregation) = self.aggregation.as_mut() { + match aggregation { + AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector) => { + collector.collect(doc_id, score); + } + AggregationSegmentCollectors::TantivyAggregationSegmentCollector(collector) => { + collector.collect(doc_id, score); + } } - None => (), } } @@ -583,6 +615,19 @@ impl SegmentCollector for QuickwitSegmentCollector { None => None, }; + // The per-segment fruit carries `aggregation_cpu_microsecs` only. + // All other resource-stats fields are populated at the leaf level + // (see `leaf_search_single_split`); merging at `merge_leaf_responses` + // sums the per-segment aggregation CPU into a per-split value. + let resource_stats = if self.aggregation_cpu_nanos > 0 { + Some(ResourceStats { + aggregation_cpu_microsecs: self.aggregation_cpu_nanos / 1_000, + ..ResourceStats::default() + }) + } else { + None + }; + Ok(LeafSearchResponse { intermediate_aggregation_result, num_hits: self.num_hits, @@ -590,7 +635,7 @@ impl SegmentCollector for QuickwitSegmentCollector { failed_splits: Vec::new(), num_attempted_splits: 1, num_successful_splits: 1, - resource_stats: None, + resource_stats, }) } } @@ -814,6 +859,7 @@ impl Collector for QuickwitCollector { num_hits: 0, segment_top_k_collector, aggregation, + aggregation_cpu_nanos: 0, }) } diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index ba206889841..c9e80fd4994 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -630,6 +630,15 @@ async fn leaf_search_single_split( } else { searcher.search(&query, &collector)? }; + // The aggregation collector populates its own `resource_stats` + // with `aggregation_cpu_microsecs` summed across segments + // (see `QuickwitSegmentCollector::harvest`). Pull it out here + // so we don't lose it when we overwrite resource_stats below. + let aggregation_cpu_microsecs = leaf_search_response + .resource_stats + .as_ref() + .map(|stats| stats.aggregation_cpu_microsecs) + .unwrap_or(0); leaf_search_response.resource_stats = Some(ResourceStats { cpu_microsecs: cpu_start.elapsed().as_micros() as u64, short_lived_cache_num_bytes: warmup_size.as_u64(), @@ -637,6 +646,12 @@ async fn leaf_search_single_split( warmup_microsecs: warmup_duration.as_micros() as u64, cpu_thread_pool_wait_microsecs: cpu_thread_pool_wait_microsecs.as_micros() as u64, + aggregation_cpu_microsecs, + // wall_microsecs is set once per leaf request at the + // `multi_index_leaf_search` boundary, not per split — + // summing per-split wall times would overcount when + // splits run concurrently. + wall_microsecs: 0, }); leaf_search_state_guard.set_state(SplitSearchState::Success); Result::<_, TantivyError>::Ok(Some(( @@ -1248,6 +1263,12 @@ pub async fn multi_index_leaf_search( leaf_search_request: LeafSearchRequest, storage_resolver: StorageResolver, ) -> Result { + // Wall time for the leaf request: from receipt of the LeafSearchRequest + // (after gRPC decode) to the moment the merged response is ready to send + // back. This is the leaf's actual response time as observed locally, + // including I/O permit waits, warmup across all splits, CPU permit + // waits, search work, and the per-leaf merge. + let wall_start = Instant::now(); let search_request: Arc = leaf_search_request .search_request .ok_or_else(|| SearchError::Internal("no search request".to_string()))? @@ -1331,11 +1352,26 @@ pub async fn multi_index_leaf_search( } } - crate::search_thread_pool() - .run_cpu_intensive(|| incremental_merge_collector.finalize().map_err(Into::into)) + let mut leaf_search_response: LeafSearchResponse = crate::search_thread_pool() + .run_cpu_intensive(|| -> crate::Result { + incremental_merge_collector.finalize().map_err(Into::into) + }) .instrument(info_span!("incremental_merge_finalize")) .await - .context("failed to merge split search responses")? + .context("failed to merge split search responses")??; + // Record the leaf wall time. If the leaf processed only metadata-only + // count splits, no `ResourceStats` was created — we synthesize a + // minimal one so the root still sees a wall time. + let wall_microsecs = wall_start.elapsed().as_micros() as u64; + if let Some(resource_stats) = leaf_search_response.resource_stats.as_mut() { + resource_stats.wall_microsecs = wall_microsecs; + } else { + leaf_search_response.resource_stats = Some(ResourceStats { + wall_microsecs, + ..ResourceStats::default() + }); + } + Ok(leaf_search_response) } /// Optimizes the search_request based on CanSplitDoBetter diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index f7989efc3ab..0d0e8c692fd 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -367,6 +367,8 @@ fn merge_resource_stats( stat_accs.warmup_microsecs += new_stats.warmup_microsecs; stat_accs.cpu_thread_pool_wait_microsecs += new_stats.cpu_thread_pool_wait_microsecs; stat_accs.cpu_microsecs += new_stats.cpu_microsecs; + stat_accs.aggregation_cpu_microsecs += new_stats.aggregation_cpu_microsecs; + stat_accs.wall_microsecs += new_stats.wall_microsecs; } else { *stat_accs_opt = Some(*new_stats); } @@ -390,6 +392,8 @@ mod stats_merge_tests { warmup_microsecs: 300, cpu_thread_pool_wait_microsecs: 400, cpu_microsecs: 500, + aggregation_cpu_microsecs: 0, + wall_microsecs: 0, }); merge_resource_stats(&stats, &mut acc_stats); @@ -402,6 +406,8 @@ mod stats_merge_tests { warmup_microsecs: 150, cpu_thread_pool_wait_microsecs: 200, cpu_microsecs: 250, + aggregation_cpu_microsecs: 0, + wall_microsecs: 0, }); merge_resource_stats(&new_stats, &mut acc_stats); @@ -412,6 +418,8 @@ mod stats_merge_tests { warmup_microsecs: 450, cpu_thread_pool_wait_microsecs: 600, cpu_microsecs: 750, + aggregation_cpu_microsecs: 0, + wall_microsecs: 0, }); assert_eq!(acc_stats, stats_plus_new_stats); @@ -432,6 +440,8 @@ mod stats_merge_tests { warmup_microsecs: 300, cpu_thread_pool_wait_microsecs: 400, cpu_microsecs: 500, + aggregation_cpu_microsecs: 0, + wall_microsecs: 0, }); let merged_stats = merge_resource_stats_it(vec![&None, &stats1, &None]); @@ -444,6 +454,8 @@ mod stats_merge_tests { warmup_microsecs: 150, cpu_thread_pool_wait_microsecs: 200, cpu_microsecs: 250, + aggregation_cpu_microsecs: 0, + wall_microsecs: 0, }); let stats3 = Some(ResourceStats { @@ -452,6 +464,8 @@ mod stats_merge_tests { warmup_microsecs: 75, cpu_thread_pool_wait_microsecs: 100, cpu_microsecs: 125, + aggregation_cpu_microsecs: 0, + wall_microsecs: 0, }); let merged_stats = merge_resource_stats_it(vec![&stats1, &stats2, &stats3]); @@ -464,6 +478,8 @@ mod stats_merge_tests { warmup_microsecs: 525, cpu_thread_pool_wait_microsecs: 700, cpu_microsecs: 875, + aggregation_cpu_microsecs: 0, + wall_microsecs: 0, }) ); } diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 9bbb5f4052c..6d53dfeb635 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -32,8 +32,9 @@ use quickwit_proto::metastore::{ }; use quickwit_proto::search::{ FetchDocsRequest, FetchDocsResponse, Hit, LeafHit, LeafRequestRef, LeafSearchRequest, - LeafSearchResponse, PartialHit, SearchPlanResponse, SearchRequest, SearchResponse, - SnippetRequest, SortDatetimeFormat, SortField, SortValue, SplitIdAndFooterOffsets, + LeafSearchResponse, PartialHit, RootResourceStats, SearchPlanResponse, SearchRequest, + SearchResponse, SnippetRequest, SortDatetimeFormat, SortField, SortValue, + SplitIdAndFooterOffsets, }; use quickwit_proto::types::{IndexUid, SplitId}; use quickwit_query::query_ast::{ @@ -569,7 +570,11 @@ async fn search_partial_hits_phase_with_scroll( mut search_request: SearchRequest, split_metadatas: &[SplitMetadata], cluster_client: &ClusterClient, -) -> crate::Result<(LeafSearchResponse, Option)> { +) -> crate::Result<( + LeafSearchResponse, + Option, + Option, +)> { let scroll_ttl_opt = get_scroll_ttl_duration(&search_request)?; if let Some(scroll_ttl) = scroll_ttl_opt { @@ -581,7 +586,7 @@ async fn search_partial_hits_phase_with_scroll( .max_hits .max(shared_consts::SCROLL_BATCH_LEN as u64); search_request.scroll_ttl_secs = None; - let mut leaf_search_resp = search_partial_hits_phase( + let (mut leaf_search_resp, root_resource_stats) = search_partial_hits_phase( searcher_context, indexes_metas_for_leaf_search, &search_request, @@ -624,9 +629,13 @@ async fn search_partial_hits_phase_with_scroll( cluster_client .put_kv(&scroll_key, &payload, scroll_ttl) .await; - Ok((leaf_search_resp, Some(scroll_key_and_start_offset))) + Ok(( + leaf_search_resp, + root_resource_stats, + Some(scroll_key_and_start_offset), + )) } else { - let leaf_search_resp = search_partial_hits_phase( + let (leaf_search_resp, root_resource_stats) = search_partial_hits_phase( searcher_context, indexes_metas_for_leaf_search, &search_request, @@ -634,7 +643,7 @@ async fn search_partial_hits_phase_with_scroll( cluster_client, ) .await?; - Ok((leaf_search_resp, None)) + Ok((leaf_search_resp, root_resource_stats, None)) } } @@ -727,6 +736,44 @@ fn is_top_5pct_memory_intensive(num_bytes: u64, split_num_docs: u64) -> bool { is_memory_intensive } +/// Folds the per-leaf execution stats from a slice of `LeafSearchResponse` +/// into a single `RootResourceStats`. Sums most fields across leaves; +/// takes the max for `wall_microsecs` so the slowest leaf surfaces. +/// +/// Returns `None` when no leaf reported any stats — this preserves the +/// "no instrumentation available" case (e.g. metadata-only count +/// requests) so the response field stays `None`. +fn compute_root_resource_stats(leaf_responses: &[LeafSearchResponse]) -> Option { + let mut any_stats = false; + let mut stats = RootResourceStats { + num_leaf_responses: leaf_responses.len() as u64, + ..RootResourceStats::default() + }; + let mut total_num_docs_matched = 0u64; + for leaf_response in leaf_responses { + total_num_docs_matched += leaf_response.num_hits; + let Some(leaf_stats) = leaf_response.resource_stats.as_ref() else { + continue; + }; + any_stats = true; + stats.total_cpu_microsecs += leaf_stats.cpu_microsecs; + stats.total_warmup_microsecs += leaf_stats.warmup_microsecs; + stats.total_cpu_thread_pool_wait_microsecs += leaf_stats.cpu_thread_pool_wait_microsecs; + stats.total_aggregation_cpu_microsecs += leaf_stats.aggregation_cpu_microsecs; + stats.total_short_lived_cache_num_bytes += leaf_stats.short_lived_cache_num_bytes; + stats.total_split_num_docs += leaf_stats.split_num_docs; + stats.sum_leaf_wall_microsecs += leaf_stats.wall_microsecs; + if leaf_stats.wall_microsecs > stats.max_leaf_wall_microsecs { + stats.max_leaf_wall_microsecs = leaf_stats.wall_microsecs; + } + } + if !any_stats { + return None; + } + stats.total_num_docs_matched = total_num_docs_matched; + Some(stats) +} + /// If this method fails for some splits, a partial search response is returned, with the list of /// faulty splits in the failed_splits field. #[instrument(level = "debug", skip_all)] @@ -736,7 +783,7 @@ pub(crate) async fn search_partial_hits_phase( search_request: &SearchRequest, split_metadatas: &[SplitMetadata], cluster_client: &ClusterClient, -) -> crate::Result { +) -> crate::Result<(LeafSearchResponse, Option)> { let leaf_search_responses: Vec = if is_metadata_count_request(search_request) { get_count_from_metadata(split_metadatas) @@ -758,6 +805,11 @@ pub(crate) async fn search_partial_hits_phase( try_join_all(leaf_request_tasks).await? }; + // Compute root-level resource stats from the per-leaf responses *before* + // they get merged, since the merge collapses leaf identity. The merge + // wall time itself is added below once we've measured it. + let mut root_resource_stats = compute_root_resource_stats(&leaf_search_responses); + let merge_collector = make_merge_collector(search_request, searcher_context.get_aggregation_limits())?; @@ -768,6 +820,7 @@ pub(crate) async fn search_partial_hits_phase( let leaf_search_results: Vec> = leaf_search_responses.into_iter().map(Ok).collect_vec(); let span = info_span!("merge_fruits"); + let merge_start = Instant::now(); let leaf_search_response = crate::search_thread_pool() .run_cpu_intensive(move || { let _span_guard = span.enter(); @@ -776,6 +829,9 @@ pub(crate) async fn search_partial_hits_phase( .await .context("failed to merge leaf search responses")? .map_err(|error: TantivyError| crate::SearchError::Internal(error.to_string()))?; + if let Some(stats) = root_resource_stats.as_mut() { + stats.root_merge_microsecs = merge_start.elapsed().as_micros() as u64; + } debug!( num_hits = leaf_search_response.num_hits, num_failed_splits = leaf_search_response.failed_splits.len(), @@ -804,7 +860,7 @@ pub(crate) async fn search_partial_hits_phase( quickwit_common::rate_limited_error!(limit_per_min=6, num_failed_splits = leaf_search_response.failed_splits.len(), failed_splits = ?PrettySample::new(&leaf_search_response.failed_splits, 5), "leaf search response contains failed splits"); } - Ok(leaf_search_response) + Ok((leaf_search_response, root_resource_stats)) } pub(crate) fn get_snippet_request(search_request: &SearchRequest) -> Option { @@ -979,8 +1035,9 @@ async fn root_search_aux( cluster_client: &ClusterClient, ) -> crate::Result { debug!(split_metadatas = ?PrettySample::new(&split_metadatas, 5)); - let (first_phase_result, scroll_key_and_start_offset_opt): ( + let (first_phase_result, root_resource_stats, scroll_key_and_start_offset_opt): ( LeafSearchResponse, + Option, Option, ) = search_partial_hits_phase_with_scroll( searcher_context, @@ -1021,6 +1078,7 @@ async fn root_search_aux( .map(ToString::to_string), failed_splits: first_phase_result.failed_splits, num_successful_splits: first_phase_result.num_successful_splits, + resource_stats: root_resource_stats, }) } @@ -1809,7 +1867,7 @@ mod tests { ListIndexesMetadataResponse, ListSplitsResponse, MockMetastoreService, }; use quickwit_proto::search::{ - ScrollRequest, SortByValue, SortOrder, SortValue, SplitSearchError, + ResourceStats, ScrollRequest, SortByValue, SortOrder, SortValue, SplitSearchError, }; use quickwit_query::query_ast::{qast_helper, qast_json_helper, query_ast_from_user_text}; use tantivy::schema::{FAST, STORED, TEXT}; @@ -5387,4 +5445,118 @@ mod tests { assert!(result.is_some()); assert_ne!(result.unwrap(), intermediate_bytes); } + + #[test] + fn test_compute_root_resource_stats_empty() { + assert_eq!(compute_root_resource_stats(&[]), None); + } + + #[test] + fn test_compute_root_resource_stats_no_stats() { + // Leaf responses without `resource_stats` (e.g. metadata-only count + // requests) should not produce a root stats object. + let leaf_responses = vec![LeafSearchResponse { + num_hits: 42, + partial_hits: Vec::new(), + failed_splits: Vec::new(), + num_attempted_splits: 1, + num_successful_splits: 1, + intermediate_aggregation_result: None, + resource_stats: None, + }]; + assert_eq!(compute_root_resource_stats(&leaf_responses), None); + } + + #[test] + fn test_compute_root_resource_stats_aggregates() { + let leaf_a = LeafSearchResponse { + num_hits: 100, + partial_hits: Vec::new(), + failed_splits: Vec::new(), + num_attempted_splits: 1, + num_successful_splits: 1, + intermediate_aggregation_result: None, + resource_stats: Some(ResourceStats { + short_lived_cache_num_bytes: 1_000, + split_num_docs: 10_000, + warmup_microsecs: 200, + cpu_thread_pool_wait_microsecs: 50, + cpu_microsecs: 800, + aggregation_cpu_microsecs: 300, + wall_microsecs: 1_500, + }), + }; + let leaf_b = LeafSearchResponse { + num_hits: 50, + partial_hits: Vec::new(), + failed_splits: Vec::new(), + num_attempted_splits: 1, + num_successful_splits: 1, + intermediate_aggregation_result: None, + resource_stats: Some(ResourceStats { + short_lived_cache_num_bytes: 4_000, + split_num_docs: 20_000, + warmup_microsecs: 700, + cpu_thread_pool_wait_microsecs: 150, + cpu_microsecs: 2_200, + aggregation_cpu_microsecs: 100, + wall_microsecs: 3_500, + }), + }; + let stats = compute_root_resource_stats(&[leaf_a, leaf_b]).unwrap(); + // Sums. + assert_eq!(stats.total_cpu_microsecs, 3_000); + assert_eq!(stats.total_warmup_microsecs, 900); + assert_eq!(stats.total_cpu_thread_pool_wait_microsecs, 200); + assert_eq!(stats.total_aggregation_cpu_microsecs, 400); + assert_eq!(stats.total_short_lived_cache_num_bytes, 5_000); + assert_eq!(stats.total_split_num_docs, 30_000); + assert_eq!(stats.total_num_docs_matched, 150); + assert_eq!(stats.sum_leaf_wall_microsecs, 5_000); + // Max — leaf_b is the slow leaf. + assert_eq!(stats.max_leaf_wall_microsecs, 3_500); + // Counter. + assert_eq!(stats.num_leaf_responses, 2); + // Root merge time is set by the caller, not by the helper. + assert_eq!(stats.root_merge_microsecs, 0); + } + + #[test] + fn test_compute_root_resource_stats_partial() { + // One leaf reports stats, the other doesn't (e.g. mixed metadata + // count + real search). The helper should still emit a stats + // object aggregating what it has. + let leaf_with_stats = LeafSearchResponse { + num_hits: 7, + partial_hits: Vec::new(), + failed_splits: Vec::new(), + num_attempted_splits: 1, + num_successful_splits: 1, + intermediate_aggregation_result: None, + resource_stats: Some(ResourceStats { + short_lived_cache_num_bytes: 0, + split_num_docs: 1_000, + warmup_microsecs: 100, + cpu_thread_pool_wait_microsecs: 0, + cpu_microsecs: 250, + aggregation_cpu_microsecs: 0, + wall_microsecs: 400, + }), + }; + let leaf_no_stats = LeafSearchResponse { + num_hits: 13, + partial_hits: Vec::new(), + failed_splits: Vec::new(), + num_attempted_splits: 1, + num_successful_splits: 1, + intermediate_aggregation_result: None, + resource_stats: None, + }; + let stats = compute_root_resource_stats(&[leaf_with_stats, leaf_no_stats]).unwrap(); + assert_eq!(stats.num_leaf_responses, 2); + // num_docs_matched still counts num_hits from every leaf. + assert_eq!(stats.total_num_docs_matched, 20); + assert_eq!(stats.total_cpu_microsecs, 250); + assert_eq!(stats.max_leaf_wall_microsecs, 400); + } } diff --git a/quickwit/quickwit-search/src/scroll_context.rs b/quickwit/quickwit-search/src/scroll_context.rs index a4a31a856b5..425f3cdeae3 100644 --- a/quickwit/quickwit-search/src/scroll_context.rs +++ b/quickwit/quickwit-search/src/scroll_context.rs @@ -25,7 +25,7 @@ use base64::prelude::BASE64_STANDARD; use quickwit_common::metrics::GaugeGuard; use quickwit_common::shared_consts::SCROLL_BATCH_LEN; use quickwit_metastore::SplitMetadata; -use quickwit_proto::search::{LeafSearchResponse, PartialHit, SearchRequest, SplitSearchError}; +use quickwit_proto::search::{PartialHit, SearchRequest, SplitSearchError}; use quickwit_proto::types::IndexUid; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; @@ -107,7 +107,9 @@ impl ScrollContext { searcher_context: &SearcherContext, ) -> crate::Result { self.search_request.search_after = Some(previous_last_hit); - let leaf_search_response: LeafSearchResponse = crate::root::search_partial_hits_phase( + // Scroll batches refill the cache; the per-leaf stats are not + // surfaced anywhere, so we discard them here. + let (leaf_search_response, _root_resource_stats) = crate::root::search_partial_hits_phase( searcher_context, &self.indexes_metas_for_leaf_search, &self.search_request, diff --git a/quickwit/quickwit-search/src/search_response_rest.rs b/quickwit/quickwit-search/src/search_response_rest.rs index 58eddc7b927..593a6856522 100644 --- a/quickwit/quickwit-search/src/search_response_rest.rs +++ b/quickwit/quickwit-search/src/search_response_rest.rs @@ -15,7 +15,7 @@ use std::convert::TryFrom; use quickwit_common::truncate_str; -use quickwit_proto::search::SearchResponse; +use quickwit_proto::search::{RootResourceStats, SearchResponse}; use quickwit_query::aggregations::AggregationResults as AggregationResultsProxy; use quickwit_query::query_ast::QueryAst; use serde::{Deserialize, Serialize}; @@ -58,6 +58,14 @@ pub struct SearchResponseRest { #[schema(value_type = Object)] #[serde(skip_serializing_if = "Option::is_none")] pub aggregations: Option, + /// Per-request execution telemetry. Surfaces sums/maxes of leaf-side + /// counters (CPU, warmup, queueing, aggregation CPU, docs matched, + /// bytes downloaded) plus root-side merge time. See + /// `RootResourceStats` in the proto definition for the diagnostic + /// playbook. Absent on scroll continuations and on requests that did + /// not exercise the distributed search path. + #[serde(skip_serializing_if = "Option::is_none")] + pub resource_stats: Option, } impl TryFrom for SearchResponseRest { @@ -109,6 +117,7 @@ impl TryFrom for SearchResponseRest { elapsed_time_micros: search_response.elapsed_time_micros, errors: search_response.errors, aggregations: aggregations_opt, + resource_stats: search_response.resource_stats, }) } } diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 5e04e6a4dcf..229ce760979 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -394,6 +394,9 @@ pub(crate) async fn scroll( aggregation_postcard: None, failed_splits: scroll_context.failed_splits, num_successful_splits: scroll_context.num_successful_splits, + // Scroll continuation reads from the cache and does not invoke the + // distributed search path, so there are no per-leaf stats to attach. + resource_stats: None, }) } /// [`SearcherContext`] provides a common set of variables diff --git a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs index def8a4c6ca7..ae9dff53fe9 100644 --- a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs @@ -481,6 +481,7 @@ mod tests { scroll_id: None, failed_splits: Vec::new(), num_successful_splits: 1, + resource_stats: None, }) }); let mock_search_service = Arc::new(mock_search_service); @@ -514,6 +515,7 @@ mod tests { scroll_id: None, failed_splits: Vec::new(), num_successful_splits: 1, + resource_stats: None, }) }); let mock_search_service = Arc::new(mock_search_service); diff --git a/quickwit/quickwit-serve/src/search_api/rest_handler.rs b/quickwit/quickwit-serve/src/search_api/rest_handler.rs index b1400fa12c0..53381bc4369 100644 --- a/quickwit/quickwit-serve/src/search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/search_api/rest_handler.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use percent_encoding::percent_decode_str; use quickwit_config::validate_index_id_pattern; -use quickwit_proto::search::{CountHits, SortField, SortOrder}; +use quickwit_proto::search::{CountHits, RootResourceStats, SortField, SortOrder}; use quickwit_query::query_ast::query_ast_from_user_text; use quickwit_search::{SearchError, SearchPlanResponseRest, SearchResponseRest, SearchService}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -39,6 +39,7 @@ use crate::{BodyFormat, with_arg}; ), components(schemas( BodyFormat, + RootResourceStats, SearchRequestQueryString, SearchResponseRest, SearchPlanResponseRest, @@ -502,6 +503,7 @@ mod tests { elapsed_time_micros: 0u64, errors: Vec::new(), aggregations: None, + resource_stats: None, }; let search_response_json: JsonValue = serde_json::to_value(search_response)?; let expected_search_response_json: JsonValue = json!({