From 88317cd5451a94a0497f2998e07ea0eda590efb5 Mon Sep 17 00:00:00 2001 From: vianney Date: Wed, 24 Jun 2026 15:59:35 +0200 Subject: [PATCH] feat(stats): send telemetry for cardinality limits --- Cargo.lock | 2 + datadog-ipc/src/shm_stats.rs | 4 +- datadog-sidecar/Cargo.toml | 2 +- datadog-sidecar/src/service/stats_flusher.rs | 2 + libdd-data-pipeline/Cargo.toml | 4 +- libdd-data-pipeline/src/otlp/metrics.rs | 2 +- .../src/trace_exporter/stats.rs | 3 + libdd-trace-stats/Cargo.toml | 8 +- .../src/span_concentrator/mod.rs | 29 +- .../src/span_concentrator/tests.rs | 44 +-- libdd-trace-stats/src/stats_exporter.rs | 305 +++++++++++++++++- 11 files changed, 359 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2d51516d26..47d6507641 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3444,7 +3444,9 @@ dependencies = [ "libdd-capabilities-impl", "libdd-common", "libdd-ddsketch", + "libdd-dogstatsd-client", "libdd-shared-runtime", + "libdd-telemetry", "libdd-trace-obfuscation", "libdd-trace-protobuf", "libdd-trace-utils", diff --git a/datadog-ipc/src/shm_stats.rs b/datadog-ipc/src/shm_stats.rs index 011bb1e2f7..fa295cafa4 100644 --- a/datadog-ipc/src/shm_stats.rs +++ b/datadog-ipc/src/shm_stats.rs @@ -819,8 +819,8 @@ impl ShmSpanConcentrator { } impl FlushableConcentrator for ShmSpanConcentrator { - fn flush_buckets(&mut self, force: bool) -> Vec { - self.drain_buckets(force) + fn flush_buckets(&mut self, force: bool) -> (Vec, u64) { + (self.drain_buckets(force), 0) } } diff --git a/datadog-sidecar/Cargo.toml b/datadog-sidecar/Cargo.toml index 443b292b32..b3f8e4b28b 100644 --- a/datadog-sidecar/Cargo.toml +++ b/datadog-sidecar/Cargo.toml @@ -26,7 +26,7 @@ datadog-sidecar-macros = { path = "../datadog-sidecar-macros" } libdd-telemetry = { path = "../libdd-telemetry", features = ["tracing"] } libdd-data-pipeline = { path = "../libdd-data-pipeline" } libdd-trace-utils = { path = "../libdd-trace-utils" } -libdd-trace-stats = { path = "../libdd-trace-stats", default-features=false, features = ["https"] } +libdd-trace-stats = { path = "../libdd-trace-stats", features = ["telemetry", "dogstatsd"] } libdd-remote-config = { path = "../libdd-remote-config" } datadog-live-debugger = { path = "../datadog-live-debugger" } datadog-ffe = { path = "../datadog-ffe", features = ["exposure-events", "evaluation-metrics"] } diff --git a/datadog-sidecar/src/service/stats_flusher.rs b/datadog-sidecar/src/service/stats_flusher.rs index 6b9890d761..13332bbb82 100644 --- a/datadog-sidecar/src/service/stats_flusher.rs +++ b/datadog-sidecar/src/service/stats_flusher.rs @@ -119,6 +119,8 @@ fn make_exporter( )), #[cfg(feature = "stats-obfuscation")] "0", + None, + None, ) } diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index ca4c4075e0..46bc35a720 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -37,7 +37,7 @@ libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime", de libdd-telemetry = { version = "5.0.1", path = "../libdd-telemetry", default-features = false, optional = true} libdd-trace-protobuf = { version = "3.0.2", path = "../libdd-trace-protobuf" } libdd-trace-normalization = { version = "2.0.0", path = "../libdd-trace-normalization" } -libdd-trace-stats = { version = "5.0.0", path = "../libdd-trace-stats", default-features = false } +libdd-trace-stats = { version = "5.0.0", path = "../libdd-trace-stats", default-features = false, features = ["dogstatsd"] } libdd-trace-utils = { version = "8.0.0", path = "../libdd-trace-utils", default-features = false } libdd-trace-obfuscation = { version = "4.0.0", path = "../libdd-trace-obfuscation", default-features = false, optional = true } libdd-ddsketch = { version = "1.0.1", path = "../libdd-ddsketch" } @@ -86,7 +86,7 @@ duplicate = "2.0.1" [features] default = ["https", "telemetry"] -telemetry = ["libdd-telemetry"] +telemetry = ["libdd-telemetry", "libdd-trace-stats/telemetry"] https = [ "libdd-common/https", "libdd-capabilities-impl/https", diff --git a/libdd-data-pipeline/src/otlp/metrics.rs b/libdd-data-pipeline/src/otlp/metrics.rs index 0360aaed57..cf1be7741e 100644 --- a/libdd-data-pipeline/src/otlp/metrics.rs +++ b/libdd-data-pipeline/src/otlp/metrics.rs @@ -226,7 +226,7 @@ pub struct OtlpStatsExporter { impl OtlpStatsExporter { /// Flush the concentrator and export stats; returns `Ok(true)` if anything was sent. async fn send(&self, force_flush: bool, max_retries: u32) -> anyhow::Result { - let buckets = { + let (buckets, _collapsed_count) = { #[allow(clippy::unwrap_used)] let mut c = self.concentrator.lock().unwrap(); c.flush_with_otlp_exact(SystemTime::now(), force_flush) diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 88960a28c2..81996006d6 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -166,6 +166,9 @@ fn create_and_start_stats_worker< client_side_stats.obfuscation_config.clone(), #[cfg(feature = "stats-obfuscation")] SUPPORTED_OBFUSCATION_VERSION_STR, + #[cfg(feature = "telemetry")] + None, + None, ); let worker_handle = ctx .shared_runtime diff --git a/libdd-trace-stats/Cargo.toml b/libdd-trace-stats/Cargo.toml index 19e489aae6..09f913e6e8 100644 --- a/libdd-trace-stats/Cargo.toml +++ b/libdd-trace-stats/Cargo.toml @@ -15,7 +15,9 @@ anyhow = "1.0" libdd-capabilities = { path = "../libdd-capabilities", version = "2.0.0" } libdd-common = { version = "5.0.0", path = "../libdd-common", default-features = false } libdd-ddsketch = { version = "1.0.1", path = "../libdd-ddsketch" } +libdd-dogstatsd-client = { version = "3.0.0", path = "../libdd-dogstatsd-client", default-features = false, optional = true } libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime", default-features = false } +libdd-telemetry = { version = "5.0.1", path = "../libdd-telemetry", default-features = false, optional = true } libdd-trace-protobuf = { version = "3.0.2", path = "../libdd-trace-protobuf" } libdd-trace-obfuscation = { version = "4.0.0", path = "../libdd-trace-obfuscation", default-features = false } libdd-trace-utils = { version = "8.0.0", path = "../libdd-trace-utils", default-features = false } @@ -49,5 +51,7 @@ tokio = { version = "1.23", features = ["rt-multi-thread", "macros", "test-util" [features] default = ["https"] stats-obfuscation = [] -https = ["libdd-common/https", "libdd-capabilities-impl/https", "libdd-shared-runtime/https"] -fips = ["libdd-common/fips", "libdd-capabilities-impl/fips", "libdd-shared-runtime/fips"] +telemetry = ["libdd-telemetry"] +dogstatsd = ["libdd-dogstatsd-client"] +https = ["libdd-common/https", "libdd-capabilities-impl/https", "libdd-shared-runtime/https","libdd-telemetry?/https","libdd-dogstatsd-client?/https" ] +fips = ["libdd-common/fips", "libdd-capabilities-impl/fips", "libdd-shared-runtime/fips", "libdd-telemetry?/fips", "libdd-dogstatsd-client?/fips"] diff --git a/libdd-trace-stats/src/span_concentrator/mod.rs b/libdd-trace-stats/src/span_concentrator/mod.rs index 28f09ab43f..1d90b2717a 100644 --- a/libdd-trace-stats/src/span_concentrator/mod.rs +++ b/libdd-trace-stats/src/span_concentrator/mod.rs @@ -20,11 +20,13 @@ pub use stat_span::StatSpan; /// `StatsExporter` is generic over `C: FlushableConcentrator` so it can work with /// both the in-process [`SpanConcentrator`] and the SHM-backed `ShmSpanConcentrator`. pub trait FlushableConcentrator { - fn flush_buckets(&mut self, force: bool) -> Vec; + /// Flush time buckets and return them together with the number of spans that were + /// collapsed into the overflow sentinel bucket due to cardinality limiting. + fn flush_buckets(&mut self, force: bool) -> (Vec, u64); } impl FlushableConcentrator for SpanConcentrator { - fn flush_buckets(&mut self, force: bool) -> Vec { + fn flush_buckets(&mut self, force: bool) -> (Vec, u64) { self.flush(SystemTime::now(), force) } } @@ -226,14 +228,26 @@ impl SpanConcentrator { /// Flush all stats bucket except for the `buffer_len` most recent. If `force` is true, flush /// all buckets. - pub fn flush(&mut self, now: SystemTime, force: bool) -> Vec { + /// + /// Returns a tuple of `(buckets, collapsed_spans)` where `collapsed_spans` is the total number + /// of spans that were collapsed into the overflow sentinel bucket due to cardinality limiting + /// across all flushed time buckets. + pub fn flush(&mut self, now: SystemTime, force: bool) -> (Vec, u64) { self.drain_due_buckets(now, force, StatsBucket::flush) } /// Like [`Self::flush`], but also emits exact per-cell scalars alongside each bucket for the /// OTLP trace-metrics path. The protobuf bucket inside each [`OtlpStatsBucket`] is identical /// to what [`Self::flush`] would produce, so the /v0.6/stats agent path is unaffected. - pub fn flush_with_otlp_exact(&mut self, now: SystemTime, force: bool) -> Vec { + /// + /// Returns a tuple of `(buckets, collapsed_spans)` where `collapsed_spans` is the total number + /// of spans that were collapsed into the overflow sentinel bucket due to cardinality limiting + /// across all flushed time buckets. + pub fn flush_with_otlp_exact( + &mut self, + now: SystemTime, + force: bool, + ) -> (Vec, u64) { self.drain_due_buckets(now, force, StatsBucket::flush_with_otlp_exact) } @@ -242,7 +256,7 @@ impl SpanConcentrator { now: SystemTime, force: bool, encode: impl Fn(StatsBucket, u64) -> T, - ) -> Vec { + ) -> (Vec, u64) { // TODO: Wait for HashMap::extract_if to be stabilized to avoid a full drain let now_timestamp = system_time_to_unix_duration(now).as_nanos() as u64; let buckets: Vec<(u64, StatsBucket)> = self.buckets.drain().collect(); @@ -253,7 +267,7 @@ impl SpanConcentrator { - (self.buffer_len as u64 - 1) * self.bucket_size }; let mut total_collapsed = 0; - buckets + let buckets_pb = buckets .into_iter() .filter_map(|(timestamp, bucket)| { // Always keep `bufferLen` buckets (default is 2: current + previous one). @@ -271,7 +285,8 @@ impl SpanConcentrator { total_collapsed += bucket.collapsed_count(); Some(encode(bucket, self.bucket_size)) }) - .collect() + .collect(); + (buckets_pb, total_collapsed) } } diff --git a/libdd-trace-stats/src/span_concentrator/tests.rs b/libdd-trace-stats/src/span_concentrator/tests.rs index 81be4fda00..3fd088e09b 100644 --- a/libdd-trace-stats/src/span_concentrator/tests.rs +++ b/libdd-trace-stats/src/span_concentrator/tests.rs @@ -126,12 +126,12 @@ fn test_concentrator_oldest_timestamp_cold() { // Assert we didn't insert spans in older buckets for _ in 0..concentrator.buffer_len { - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert_eq!(stats.len(), 0, "We should get 0 time buckets"); flushtime += Duration::from_nanos(concentrator.bucket_size); } - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert_eq!(stats.len(), 1, "We should get exactly one time bucket"); @@ -188,12 +188,12 @@ fn test_concentrator_oldest_timestamp_hot() { } for _ in 0..(concentrator.buffer_len - 1) { - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert!(stats.is_empty(), "We should get 0 time buckets"); flushtime += Duration::from_nanos(concentrator.bucket_size); } - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert_eq!(stats.len(), 1, "We should get exactly one time bucket"); // First oldest bucket aggregates, it should have it all except the @@ -213,7 +213,7 @@ fn test_concentrator_oldest_timestamp_hot() { assert_counts_equal(expected, stats.first().unwrap().stats.clone()); flushtime += Duration::from_nanos(concentrator.bucket_size); - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert_eq!(stats.len(), 1, "We should get exactly one time bucket"); // Stats of the last four spans. @@ -278,7 +278,7 @@ fn test_concentrator_stats_totals() { let mut flushtime = now; for _ in 0..=concentrator.buffer_len { - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); if stats.is_empty() { continue; } @@ -528,7 +528,7 @@ fn test_concentrator_stats_counts() { let mut flushtime = now; for _ in 0..=concentrator.buffer_len + 2 { - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); let expected_flushed_timestamps = align_timestamp( system_time_to_unix_duration(flushtime).as_nanos() as u64, concentrator.bucket_size, @@ -553,7 +553,7 @@ fn test_concentrator_stats_counts() { stats.first().unwrap().stats.clone(), ); - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert_eq!( stats.len(), 0, @@ -658,7 +658,7 @@ fn test_span_should_be_included_in_stats() { }, ]; - let stats = concentrator.flush( + let (stats, _) = concentrator.flush( now + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64), false, ); @@ -696,7 +696,7 @@ fn test_ignore_partial_spans() { concentrator.add_span(span); } - let stats = concentrator.flush( + let (stats, _) = concentrator.flush( now + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64), false, ); @@ -727,10 +727,10 @@ fn test_force_flush() { let flushtime = now - Duration::from_secs(3600); // Bucket should not be flushed without force flush - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert_eq!(0, stats.len()); - let stats = concentrator.flush(flushtime, true); + let (stats, _) = concentrator.flush(flushtime, true); assert_eq!(1, stats.len()); } @@ -900,7 +900,7 @@ fn test_peer_tags_aggregation() { }, ]; - let stats_with_peer_tags = concentrator_with_peer_tags.flush(flushtime, false); + let (stats_with_peer_tags, _) = concentrator_with_peer_tags.flush(flushtime, false); assert_counts_equal( expected_with_peer_tags, stats_with_peer_tags @@ -910,7 +910,7 @@ fn test_peer_tags_aggregation() { .clone(), ); - let stats_without_peer_tags = concentrator_without_peer_tags.flush(flushtime, false); + let (stats_without_peer_tags, _) = concentrator_without_peer_tags.flush(flushtime, false); assert_counts_equal( expected_without_peer_tags, stats_without_peer_tags @@ -1023,7 +1023,7 @@ fn test_peer_tags_quantization_aggregation() { ..Default::default() }]; - let stats_with_peer_tags = concentrator_with_peer_tags.flush(flushtime, false); + let (stats_with_peer_tags, _) = concentrator_with_peer_tags.flush(flushtime, false); assert_counts_equal( expected_with_peer_tags, stats_with_peer_tags @@ -1193,7 +1193,7 @@ fn test_base_service_peer_tag() { }, ]; - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert_counts_equal( expected, stats @@ -1497,7 +1497,7 @@ fn test_pb_span() { // Flush and get stats let flushtime = now + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64); - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert_eq!(stats.len(), 1, "Should get exactly one time bucket"); let bucket = &stats[0]; @@ -1610,7 +1610,7 @@ fn test_flush_with_otlp_exact_per_cell_scalars() { concentrator.add_span(s); } - let flushed = concentrator.flush_with_otlp_exact(now, true); + let flushed = concentrator.flush_with_otlp_exact(now, true).0; assert_eq!(flushed.len(), 1); let b = &flushed[0]; assert_eq!(b.exact.len(), 1); @@ -1676,7 +1676,7 @@ fn test_cardinality_limit_collapse() { concentrator.add_span(&span); } - let buckets = concentrator.flush(SystemTime::now(), true); + let (buckets, _) = concentrator.flush(SystemTime::now(), true); assert!(!buckets.is_empty(), "should get at least one time bucket"); let stats = &buckets[0].stats; @@ -1734,7 +1734,7 @@ fn test_overflow_bucket_counts() { concentrator.add_span(&span); } - let buckets = concentrator.flush(SystemTime::now(), true); + let (buckets, _) = concentrator.flush(SystemTime::now(), true); assert!(!buckets.is_empty()); let stats = &buckets[0].stats; @@ -1783,7 +1783,7 @@ fn test_no_collapse_within_limit() { concentrator.add_span(&span); } - let buckets = concentrator.flush(SystemTime::now(), true); + let (buckets, _) = concentrator.flush(SystemTime::now(), true); assert!(!buckets.is_empty()); let stats = &buckets[0].stats; @@ -1835,7 +1835,7 @@ fn test_overflow_bucket_key_sentinel_values() { concentrator.add_span(&first); concentrator.add_span(&second); - let buckets = concentrator.flush(SystemTime::now(), true); + let (buckets, _) = concentrator.flush(SystemTime::now(), true); assert!(!buckets.is_empty()); let stats = &buckets[0].stats; diff --git a/libdd-trace-stats/src/stats_exporter.rs b/libdd-trace-stats/src/stats_exporter.rs index f92c54e8d7..c771fd5b2b 100644 --- a/libdd-trace-stats/src/stats_exporter.rs +++ b/libdd-trace-stats/src/stats_exporter.rs @@ -14,7 +14,7 @@ use crate::span_concentrator::SharedStatsComputationObfuscationConfig; use crate::span_concentrator::{FlushableConcentrator, SpanConcentrator}; use async_trait::async_trait; use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; -use libdd_common::Endpoint; +use libdd_common::{tag, Endpoint}; use libdd_shared_runtime::Worker; use libdd_trace_protobuf::pb; use libdd_trace_utils::send_with_retry::{send_with_retry, RetryStrategy}; @@ -25,6 +25,12 @@ use tracing::error; pub const STATS_ENDPOINT_PATH: &str = "/v0.6/stats"; +/// Health metric name for the number of spans collapsed. +pub const COLLAPSED_SPANS_HEALTH_METRIC: &str = "datadog.tracer.stats.collapsed_spans"; + +/// Telemetry metric name for the number of spans collapsed. +pub const COLLAPSED_SPANS_TELEMETRY_METRIC: &str = "tracers.stats_collapsed_spans"; + /// Metadata needed by the stats exporter to annotate payloads and HTTP requests. #[derive(Clone, Default, Debug)] pub struct StatsMetadata { @@ -93,6 +99,15 @@ pub struct StatsExporter< obfuscation_config: SharedStatsComputationObfuscationConfig, #[cfg(feature = "stats-obfuscation")] supported_obfuscation_version: &'static str, + /// Optional telemetry handle and context key. + #[cfg(feature = "telemetry")] + telemetry: Option<( + libdd_telemetry::worker::TelemetryWorkerHandle, + libdd_telemetry::metrics::ContextKey, + )>, + /// Optional DogStatsD client. + #[cfg(feature = "dogstatsd")] + dogstatsd: Option>, } impl @@ -105,6 +120,7 @@ impl /// agent /// - `meta` metadata used in ClientStatsPayload and as headers to send stats to the agent /// - `endpoint` the Endpoint used to send stats to the agent + #[allow(clippy::too_many_arguments)] pub fn new( flush_interval: time::Duration, concentrator: Arc>, @@ -114,7 +130,22 @@ impl #[cfg(feature = "stats-obfuscation")] obfuscation_config: SharedStatsComputationObfuscationConfig, #[cfg(feature = "stats-obfuscation")] supported_obfuscation_version: &'static str, + #[cfg(feature = "telemetry")] telemetry: Option< + libdd_telemetry::worker::TelemetryWorkerHandle, + >, + #[cfg(feature = "dogstatsd")] dogstatsd: Option>, ) -> Self { + #[cfg(feature = "telemetry")] + let telemetry = telemetry.map(|handle| { + let key = handle.register_metric_context( + COLLAPSED_SPANS_TELEMETRY_METRIC.to_string(), + vec![tag!("collapsed_spans", "whole_key")], + libdd_telemetry::data::metrics::MetricType::Count, + true, + libdd_telemetry::data::metrics::MetricNamespace::Tracers, + ); + (handle, key) + }); Self { flush_interval, concentrator, @@ -126,6 +157,10 @@ impl obfuscation_config, #[cfg(feature = "stats-obfuscation")] supported_obfuscation_version, + #[cfg(feature = "telemetry")] + telemetry, + #[cfg(feature = "dogstatsd")] + dogstatsd, } } @@ -146,7 +181,23 @@ impl /// case stats cannot be flushed since the concentrator might be corrupted. /// Returns `Ok(true)` if stats were sent, `Ok(false)` if the concentrator had nothing to send. pub async fn send(&self, force_flush: bool) -> anyhow::Result { - let payload = self.flush(force_flush); + let (payload, collapsed_spans) = self.flush(force_flush); + + if collapsed_spans > 0 { + #[cfg(feature = "telemetry")] + if let Some((handle, key)) = &self.telemetry { + let _ = handle.add_point(collapsed_spans as f64, key, vec![]); + } + #[cfg(feature = "dogstatsd")] + if let Some(client) = &self.dogstatsd { + client.send(vec![libdd_dogstatsd_client::DogStatsDAction::Count( + COLLAPSED_SPANS_HEALTH_METRIC, + collapsed_spans as i64, + [tag!("collapsed_spans", "whole_key")].iter(), + )]); + } + } + if payload.stats.is_empty() { return Ok(false); } @@ -194,14 +245,13 @@ impl /// # Panic /// Will panic if another thread panicked while holding the concentrator lock in which /// case stats cannot be flushed since the concentrator might be corrupted. - fn flush(&self, force_flush: bool) -> pb::ClientStatsPayload { + fn flush(&self, force_flush: bool) -> (pb::ClientStatsPayload, u64) { let sequence = self.sequence_id.fetch_add(1, Ordering::Relaxed); - encode_stats_payload( - &self.meta, - sequence, - #[allow(clippy::unwrap_used)] - self.concentrator.lock().unwrap().flush_buckets(force_flush), - ) + #[allow(clippy::unwrap_used)] + let (buckets, collapsed_spans) = + self.concentrator.lock().unwrap().flush_buckets(force_flush); + let payload = encode_stats_payload(&self.meta, sequence, buckets); + (payload, collapsed_spans) } } @@ -357,6 +407,10 @@ mod tests { StatsComputationObfuscationConfig::disabled(), #[cfg(feature = "stats-obfuscation")] "1", + #[cfg(feature = "telemetry")] + None, + #[cfg(feature = "dogstatsd")] + None, ); let send_status = stats_exporter.send(true).await; @@ -388,6 +442,10 @@ mod tests { StatsComputationObfuscationConfig::disabled(), #[cfg(feature = "stats-obfuscation")] "1", + #[cfg(feature = "telemetry")] + None, + #[cfg(feature = "dogstatsd")] + None, ); let send_status = stats_exporter.send(true).await; @@ -427,6 +485,10 @@ mod tests { StatsComputationObfuscationConfig::disabled(), #[cfg(feature = "stats-obfuscation")] "1", + #[cfg(feature = "telemetry")] + None, + #[cfg(feature = "dogstatsd")] + None, ); let _handle = shared_runtime .spawn_worker(stats_exporter, true) @@ -472,6 +534,10 @@ mod tests { StatsComputationObfuscationConfig::disabled(), #[cfg(feature = "stats-obfuscation")] "1", + #[cfg(feature = "telemetry")] + None, + #[cfg(feature = "dogstatsd")] + None, ); let _handle = shared_runtime @@ -543,6 +609,10 @@ mod tests { })), #[cfg(feature = "stats-obfuscation")] "1", + #[cfg(feature = "telemetry")] + None, + #[cfg(feature = "dogstatsd")] + None, ); let send_status = stats_exporter.send(true).await; @@ -550,4 +620,221 @@ mod tests { mock.assert_async().await; } + + /// Build a concentrator with `max_entries_per_bucket = 1` pre-seeded with four distinct spans + /// so that three spans are collapsed into the overflow bucket. + fn get_collapsed_concentrator() -> SpanConcentrator { + use libdd_trace_utils::span::{trace_utils, v04::SpanSlice}; + + let mut concentrator = SpanConcentrator::new( + BUCKETS_DURATION, + SystemTime::now(), + vec![], + vec![], + Some(1), // max 1 distinct key → second span collapses + #[cfg(feature = "stats-obfuscation")] + None, + ); + + let mut trace = vec![ + SpanSlice { + service: "svc", + resource: "resource-a", + duration: 10, + ..Default::default() + }, + SpanSlice { + service: "svc", + resource: "resource-b", + duration: 20, + ..Default::default() + }, + SpanSlice { + service: "svc", + resource: "resource-c", + duration: 20, + ..Default::default() + }, + SpanSlice { + service: "svc", + resource: "resource-d", + duration: 20, + ..Default::default() + }, + ]; + trace_utils::compute_top_level_span(trace.as_mut_slice()); + for span in &trace { + concentrator.add_span(span); + } + concentrator + } + + /// Verify that when `collapsed_spans == 0` the DogStatsD socket receives nothing. + #[cfg(feature = "dogstatsd")] + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_no_emission_when_zero() { + use std::net; + + let server = MockServer::start_async().await; + server + .mock_async(|_when, then| { + then.status(200).body(""); + }) + .await; + + // Bind a UDP socket so we can detect whether anything arrives. + let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind UDP socket"); + socket + .set_read_timeout(Some(std::time::Duration::from_millis(200))) + .unwrap(); + let addr = socket.local_addr().unwrap().to_string(); + + let dogstatsd_client = Arc::new( + libdd_dogstatsd_client::new(libdd_common::Endpoint::from_slice(&addr)) + .expect("failed to create dogstatsd client"), + ); + + // get_test_concentrator() has no cardinality collapse: collapsed_spans will be 0. + let stats_exporter = StatsExporter::::new( + BUCKETS_DURATION, + Arc::new(Mutex::new(get_test_concentrator())), + get_test_metadata(), + Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), + NativeCapabilities::new_client(), + #[cfg(feature = "stats-obfuscation")] + StatsComputationObfuscationConfig::disabled(), + #[cfg(feature = "stats-obfuscation")] + "1", + #[cfg(feature = "telemetry")] + None, + Some(dogstatsd_client), + ); + + stats_exporter.send(true).await.unwrap(); + + // The socket must not have received any datagram. + let mut buf = [0u8; 256]; + let result = socket.recv(&mut buf); + assert!( + result.is_err(), + "No DogStatsD datagram expected when collapsed_spans == 0" + ); + } + + /// Verify that `COLLAPSED_SPANS_METRIC` is emitted to DogStatsD when spans are collapsed. + #[cfg(feature = "dogstatsd")] + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_collapsed_spans_dogstatsd() { + use std::net; + + let server = MockServer::start_async().await; + server + .mock_async(|_when, then| { + then.status(200).body(""); + }) + .await; + + let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind UDP socket"); + socket + .set_read_timeout(Some(std::time::Duration::from_millis(500))) + .unwrap(); + let addr = socket.local_addr().unwrap().to_string(); + + let dogstatsd_client = Arc::new( + libdd_dogstatsd_client::new(libdd_common::Endpoint::from_slice(&addr)) + .expect("failed to create dogstatsd client"), + ); + + let stats_exporter = StatsExporter::::new( + BUCKETS_DURATION, + Arc::new(Mutex::new(get_collapsed_concentrator())), + get_test_metadata(), + Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), + NativeCapabilities::new_client(), + #[cfg(feature = "stats-obfuscation")] + StatsComputationObfuscationConfig::disabled(), + #[cfg(feature = "stats-obfuscation")] + "1", + #[cfg(feature = "telemetry")] + None, + Some(dogstatsd_client), + ); + + stats_exporter.send(true).await.unwrap(); + + let mut buf = [0u8; 256]; + let n = socket + .recv(&mut buf) + .expect("expected a DogStatsD datagram"); + let datagram = std::str::from_utf8(&buf[..n]).expect("valid utf-8"); + assert_eq!( + datagram, "datadog.tracer.stats.collapsed_spans:3|c|#collapsed_spans:whole_key", + "DogStatsD datagram must match the expected format" + ); + } + + /// Verify that `COLLAPSED_SPANS_METRIC` is enqueued to the telemetry worker when spans + /// are collapsed. This does not verify the actual value of the metric. + #[cfg(feature = "telemetry")] + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_collapsed_spans_telemetry() { + use libdd_telemetry::worker::TelemetryWorkerBuilder; + + let server = MockServer::start_async().await; + server + .mock_async(|_when, then| { + then.status(200).body(""); + }) + .await; + + let (handle, _join_handle) = TelemetryWorkerBuilder::new( + "test-host".to_string(), + "test-service".to_string(), + "rust".to_string(), + "1.0".to_string(), + "0.0.0".to_string(), + ) + .spawn(); + + let stats_exporter = StatsExporter::::new( + BUCKETS_DURATION, + Arc::new(Mutex::new(get_collapsed_concentrator())), + get_test_metadata(), + Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), + NativeCapabilities::new_client(), + #[cfg(feature = "stats-obfuscation")] + StatsComputationObfuscationConfig::disabled(), + #[cfg(feature = "stats-obfuscation")] + "1", + #[cfg(feature = "telemetry")] + Some(handle), + #[cfg(feature = "dogstatsd")] + None, + ); + + stats_exporter.send(true).await.unwrap(); + + let stats_exporter_ref = &stats_exporter; + let (handle_ref, _key) = stats_exporter_ref + .telemetry + .as_ref() + .expect("telemetry must be set"); + let receiver = handle_ref.stats().expect("failed to request stats"); + let stats = receiver.await.expect("failed to receive stats"); + // metric_contexts == 1 verifies that exactly one metric name was registered + // (i.e. COLLAPSED_SPANS_METRIC and nothing else). + // metric_buckets.buckets == 1 verifies that a data point was recorded for it. + // However it does not check the value of the data point. + assert_eq!( + stats.metric_contexts, 1, + "exactly one metric context (COLLAPSED_SPANS_METRIC) should be registered" + ); + assert_eq!( + stats.metric_buckets.buckets, 1, + "exactly one metric bucket expected after one collapsed-spans emission" + ); + } }