From 1c25e73cbabdba149949681f79fb821b92869724 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 24 Jun 2026 12:16:36 -0400 Subject: [PATCH 1/5] implement additional_metrics_tags in libdd-trace-stats --- datadog-ipc/src/shm_stats.rs | 1 + .../src/trace_exporter/builder.rs | 1 + .../src/trace_exporter/stats.rs | 1 + libdd-trace-protobuf/src/pb.rs | 10 ++- libdd-trace-stats/README.md | 1 + .../src/span_concentrator/aggregation.rs | 67 ++++++++++++++++--- .../src/span_concentrator/mod.rs | 24 ++++++- 7 files changed, 93 insertions(+), 12 deletions(-) diff --git a/datadog-ipc/src/shm_stats.rs b/datadog-ipc/src/shm_stats.rs index 44941ec667..83dbdf3d4d 100644 --- a/datadog-ipc/src/shm_stats.rs +++ b/datadog-ipc/src/shm_stats.rs @@ -818,6 +818,7 @@ impl ShmSpanConcentrator { .unwrap_or_default(), service_source: read_str!(f.service_source), span_derived_primary_tags: vec![], + additional_metric_tags: vec![], } } } diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 1153a6dbd7..00ac226ba3 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -541,6 +541,7 @@ impl TraceExporterBuilder { std::time::SystemTime::now(), span_kinds, self.peer_tags.clone(), + vec![], #[cfg(feature = "stats-obfuscation")] None, ))); diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 6a394d3249..f98d955b47 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -134,6 +134,7 @@ pub(crate) fn start_stats_computation< std::time::SystemTime::now(), span_kinds, peer_tags, + vec![], #[cfg(feature = "stats-obfuscation")] Some(client_side_stats.obfuscation_config.clone()), ))); diff --git a/libdd-trace-protobuf/src/pb.rs b/libdd-trace-protobuf/src/pb.rs index feae884fb7..2aef38d07e 100644 --- a/libdd-trace-protobuf/src/pb.rs +++ b/libdd-trace-protobuf/src/pb.rs @@ -654,18 +654,26 @@ pub struct ClientGroupedStats { #[serde(rename = "HTTPEndpoint")] pub http_endpoint: ::prost::alloc::string::String, /// @inject_tag: msg:"srv_src" + /// used to identify service override origin #[prost(string, tag = "21")] #[serde(default)] #[serde(rename = "srv_src")] pub service_source: ::prost::alloc::string::String, - /// used to identify service override origin /// span_derived_primary_tags are user-configured tags that are extracted from spans and used for stats aggregation /// E.g., `aws.s3.bucket`, `http.url`, or any custom tag + /// Deprecated: use additional_metric_tags (field 23) instead. #[prost(string, repeated, tag = "22")] #[serde(default)] pub span_derived_primary_tags: ::prost::alloc::vec::Vec< ::prost::alloc::string::String, >, + /// additional_metric_tags are tags to be used as additional dimensions for stats aggregation + /// E.g., `aws.s3.bucket`, `http.url`, or any custom tag + #[prost(string, repeated, tag = "23")] + #[serde(default)] + pub additional_metric_tags: ::prost::alloc::vec::Vec< + ::prost::alloc::string::String, + >, } /// Trilean is an expanded boolean type that is meant to differentiate between being unset and false. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] diff --git a/libdd-trace-stats/README.md b/libdd-trace-stats/README.md index 789b35f0e0..ab87d112fd 100644 --- a/libdd-trace-stats/README.md +++ b/libdd-trace-stats/README.md @@ -76,6 +76,7 @@ let mut concentrator = SpanConcentrator::new( SystemTime::now(), vec!["client".to_string(), "server".to_string()], // eligible span kinds vec!["peer.service".to_string()], // peer tag keys + vec!["example.key".to_string()], // additional metric tag keys ); // Add spans diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index cc5f82bdab..32c33443da 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -82,6 +82,7 @@ impl FixedAggregationKey { pub(super) struct BorrowedAggregationKey<'a> { fixed: FixedAggregationKey<&'a str>, peer_tags: Vec<(&'a str, Cow<'a, str>)>, + additional_metric_tags: Vec<(&'a str, &'a str)>, } impl hashbrown::Equivalent for BorrowedAggregationKey<'_> { @@ -94,6 +95,12 @@ impl hashbrown::Equivalent for BorrowedAggregationKey<'_> { .iter() .zip(other.peer_tags.iter()) .all(|((k1, v1), (k2, v2))| k1 == k2 && v1 == v2) + && self.additional_metric_tags.len() == other.additional_metric_tags.len() + && self + .additional_metric_tags + .iter() + .zip(other.additional_metric_tags.iter()) + .all(|((k1, v1), (k2, v2))| k1 == k2 && v1 == v2) } } @@ -108,6 +115,7 @@ impl hashbrown::Equivalent for BorrowedAggregationKey<'_> { pub(super) struct OwnedAggregationKey { fixed: FixedAggregationKey, peer_tags: Vec<(String, String)>, + additional_metric_tags: Vec<(String, String)>, } impl From<&BorrowedAggregationKey<'_>> for OwnedAggregationKey { @@ -119,6 +127,11 @@ impl From<&BorrowedAggregationKey<'_>> for OwnedAggregationKey { .iter() .map(|(k, v)| (k.to_string(), v.to_string())) .collect(), + additional_metric_tags: value + .additional_metric_tags + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), } } } @@ -212,16 +225,27 @@ fn grpc_status_str_to_int_value(v: &str) -> Option { impl<'a> BorrowedAggregationKey<'a> { /// Return an AggregationKey matching the given span. /// - /// If `peer_tags_keys` is not empty then the peer tags of the span will be included in the + /// If `peer_tag_keys` is not empty then the peer tags of the span will be included in the /// key. - pub(super) fn from_span>(span: &'a T, peer_tag_keys: &'a [String]) -> Self { - Self::from_obfuscated_span(span.resource(), span, peer_tag_keys) + /// If `additional_metric_tags` is not empty then matching span tags keys are included in the key. + pub(super) fn from_span>( + span: &'a T, + peer_tag_keys: &'a [String], + additional_metric_tag_keys: &'a [String], + ) -> Self { + Self::from_obfuscated_span( + span.resource(), + span, + peer_tag_keys, + additional_metric_tag_keys, + ) } pub(crate) fn from_obfuscated_span<'b, T>( resource_name: &'a str, span: &'b T, peer_tag_keys: &'b [String], + additional_metric_tag_keys: &'b [String], ) -> BorrowedAggregationKey<'a> where T: StatSpan<'b>, @@ -265,6 +289,14 @@ impl<'a> BorrowedAggregationKey<'a> { let service_source = span.get_meta(TAG_SVC_SRC).unwrap_or_default(); + let additional_metric_tags: Vec<(&'a str, &'a str)> = additional_metric_tag_keys + .iter() + .filter_map(|key| match span.get_meta(key.as_str()) { + Some(v) if !v.is_empty() => Some((key.as_str(), v)), + _ => None, + }) + .collect(); + Self { fixed: FixedAggregationKey { resource_name, @@ -283,6 +315,7 @@ impl<'a> BorrowedAggregationKey<'a> { is_trace_root: span.is_trace_root(), }, peer_tags, + additional_metric_tags, } } } @@ -312,6 +345,14 @@ impl From for OwnedAggregationKey { Some((key.to_string(), value.to_string())) }) .collect(), + additional_metric_tags: value + .additional_metric_tags + .into_iter() + .filter_map(|t| { + let (key, value) = t.split_once(':')?; + Some((key.to_string(), value.to_string())) + }) + .collect(), } } } @@ -524,7 +565,12 @@ fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::Cl .map(|c| c.to_string()) .unwrap_or_default(), service_source: f.service_source, - span_derived_primary_tags: vec![], // Todo + span_derived_primary_tags: vec![], // Deprecated + additional_metric_tags: key + .additional_metric_tags + .into_iter() + .map(|(k, v)| format!("{k}:{v}")) + .collect(), } } @@ -547,12 +593,14 @@ mod tests { OwnedAggregationKey { fixed: self, peer_tags: vec![], + additional_metric_tags: vec![], } } fn into_key_with_peers(self, peer_tags: Vec<(String, String)>) -> OwnedAggregationKey { OwnedAggregationKey { fixed: self, peer_tags, + additional_metric_tags: vec![], } } } @@ -1065,7 +1113,7 @@ mod tests { ]; for (span, expected_key) in test_cases { - let borrowed_key = BorrowedAggregationKey::from_span(&span, &[]); + let borrowed_key = BorrowedAggregationKey::from_span(&span, &[], &[]); assert_eq!( OwnedAggregationKey::from(&borrowed_key), expected_key, @@ -1078,7 +1126,8 @@ mod tests { } for (span, expected_key) in test_cases_with_peer_tags { - let borrowed_key = BorrowedAggregationKey::from_span(&span, test_peer_tags.as_slice()); + let borrowed_key = + BorrowedAggregationKey::from_span(&span, test_peer_tags.as_slice(), &[]); assert_eq!(OwnedAggregationKey::from(&borrowed_key), expected_key); assert_eq!( get_hash(&borrowed_key), @@ -1106,7 +1155,7 @@ mod tests { .into(), ..Default::default() }; - let key = BorrowedAggregationKey::from_span(&span_ipv4, &peer_tag_keys); + let key = BorrowedAggregationKey::from_span(&span_ipv4, &peer_tag_keys, &[]); let owned = OwnedAggregationKey::from(&key); assert_eq!( owned.peer_tags, @@ -1134,7 +1183,7 @@ mod tests { ..Default::default() }; let ipv6_keys = vec!["peer.hostname".to_string()]; - let key = BorrowedAggregationKey::from_span(&span_ipv6, &ipv6_keys); + let key = BorrowedAggregationKey::from_span(&span_ipv6, &ipv6_keys, &[]); let owned = OwnedAggregationKey::from(&key); assert_eq!( owned.peer_tags, @@ -1155,7 +1204,7 @@ mod tests { ..Default::default() }; let non_ip_keys = vec!["db.instance".to_string()]; - let key = BorrowedAggregationKey::from_span(&span_non_ip, &non_ip_keys); + let key = BorrowedAggregationKey::from_span(&span_non_ip, &non_ip_keys, &[]); let owned = OwnedAggregationKey::from(&key); assert_eq!( owned.peer_tags, diff --git a/libdd-trace-stats/src/span_concentrator/mod.rs b/libdd-trace-stats/src/span_concentrator/mod.rs index 636e87744d..c0d639fcfb 100644 --- a/libdd-trace-stats/src/span_concentrator/mod.rs +++ b/libdd-trace-stats/src/span_concentrator/mod.rs @@ -94,6 +94,8 @@ pub struct SpanConcentrator { span_kinds_stats_computed: Vec, /// keys for supplementary tags that describe peer.service entities peer_tag_keys: Vec, + /// keys for additional tags on trace stats + additional_metric_tag_keys: Vec, #[cfg(feature = "stats-obfuscation")] obfuscation_config: SharedStatsComputationObfuscationConfig, } @@ -103,13 +105,15 @@ impl SpanConcentrator { /// - `bucket_size` is the size of the time buckets /// - `now` the current system time, used to define the oldest bucket /// - `span_kinds_stats_computed` list of span kinds eligible for stats computation - /// - `peer_tags_keys` list of keys considered as peer tags for aggregation + /// - `peer_tag_keys` list of keys considered as peer tags for aggregation + /// - `additional_metric_tag_keys` list of keys considered as addtional tags for aggregation /// - `obfuscation_config` optional and updatable config for resource key obfuscation pub fn new( bucket_size: Duration, now: SystemTime, span_kinds_stats_computed: Vec, peer_tag_keys: Vec, + additional_metric_tag_keys: Vec, #[cfg(feature = "stats-obfuscation")] obfuscation_config: Option< SharedStatsComputationObfuscationConfig, >, @@ -124,6 +128,7 @@ impl SpanConcentrator { buffer_len: 2, span_kinds_stats_computed, peer_tag_keys, + additional_metric_tag_keys, #[cfg(feature = "stats-obfuscation")] obfuscation_config: obfuscation_config.unwrap_or_default(), } @@ -149,6 +154,16 @@ impl SpanConcentrator { self.peer_tag_keys = peer_tags; } + /// Return the list of keys considered as additional_metric_tag_keys for aggregation + pub fn additional_metric_tag_keys(&self) -> &[String] { + &self.additional_metric_tag_keys + } + + /// Set the list of keys considered as additional_metric_tag_keys for aggregation + pub fn set_additional_metric_tag_keys(&mut self, tag_keys: Vec) { + self.additional_metric_tag_keys = tag_keys; + } + /// Return the bucket size used for aggregation pub fn get_bucket_size(&self) -> Duration { Duration::from_nanos(self.bucket_size) @@ -173,8 +188,13 @@ impl SpanConcentrator { res, span, self.peer_tag_keys.as_slice(), + self.additional_metric_tag_keys.as_slice(), + ), + None => BorrowedAggregationKey::from_span( + span, + self.peer_tag_keys.as_slice(), + self.additional_metric_tag_keys.as_slice(), ), - None => BorrowedAggregationKey::from_span(span, self.peer_tag_keys.as_slice()), }; self.buckets .entry(bucket_timestamp) From 1efdd88ce21e0292dbfe3b26da6090e04f409394 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 24 Jun 2026 14:38:06 -0400 Subject: [PATCH 2/5] update and add tests --- .../benches/span_concentrator_bench.rs | 1 + .../src/span_concentrator/tests.rs | 172 ++++++++++++++++++ libdd-trace-stats/src/stats_exporter.rs | 1 + 3 files changed, 174 insertions(+) diff --git a/libdd-trace-stats/benches/span_concentrator_bench.rs b/libdd-trace-stats/benches/span_concentrator_bench.rs index 03526acb3e..ae72916694 100644 --- a/libdd-trace-stats/benches/span_concentrator_bench.rs +++ b/libdd-trace-stats/benches/span_concentrator_bench.rs @@ -44,6 +44,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { now, vec![], vec!["db_name".into(), "bucket_s3".into()], + vec![], #[cfg(feature = "stats-obfuscation")] None, ); diff --git a/libdd-trace-stats/src/span_concentrator/tests.rs b/libdd-trace-stats/src/span_concentrator/tests.rs index 2dd064d93a..cf1adf0ade 100644 --- a/libdd-trace-stats/src/span_concentrator/tests.rs +++ b/libdd-trace-stats/src/span_concentrator/tests.rs @@ -105,6 +105,7 @@ fn test_concentrator_oldest_timestamp_cold() { now, vec![], vec![], + vec![], #[cfg(feature = "stats-obfuscation")] None, ); @@ -161,6 +162,7 @@ fn test_concentrator_oldest_timestamp_hot() { now, vec![], vec![], + vec![], #[cfg(feature = "stats-obfuscation")] None, ); @@ -240,6 +242,7 @@ fn test_concentrator_stats_totals() { now, vec![], vec![], + vec![], #[cfg(feature = "stats-obfuscation")] None, ); @@ -306,6 +309,7 @@ fn test_concentrator_stats_counts() { now, vec![], vec![], + vec![], #[cfg(feature = "stats-obfuscation")] None, ); @@ -603,6 +607,7 @@ fn test_span_should_be_included_in_stats() { now, get_span_kinds(), vec![], + vec![], #[cfg(feature = "stats-obfuscation")] None, ); @@ -683,6 +688,7 @@ fn test_ignore_partial_spans() { now, get_span_kinds(), vec![], + vec![], #[cfg(feature = "stats-obfuscation")] None, ); @@ -708,6 +714,7 @@ fn test_force_flush() { now, get_span_kinds(), vec![], + vec![], #[cfg(feature = "stats-obfuscation")] None, ); @@ -791,6 +798,7 @@ fn test_peer_tags_aggregation() { now, get_span_kinds(), vec![], + vec![], #[cfg(feature = "stats-obfuscation")] None, ); @@ -799,6 +807,7 @@ fn test_peer_tags_aggregation() { now, get_span_kinds(), vec!["db.instance".to_string(), "db.system".to_string()], + vec![], #[cfg(feature = "stats-obfuscation")] None, ); @@ -982,6 +991,7 @@ fn test_peer_tags_quantization_aggregation() { "db.system".to_string(), "peer.hostname".to_string(), ], + vec![], #[cfg(feature = "stats-obfuscation")] None, ); @@ -1108,6 +1118,7 @@ fn test_base_service_peer_tag() { now, get_span_kinds(), vec!["db.instance".to_string(), "db.system".to_string()], + vec![], #[cfg(feature = "stats-obfuscation")] None, ); @@ -1193,6 +1204,125 @@ fn test_base_service_peer_tag() { ); } +/// Test the additional metric tags aggregation +#[test] +fn test_additional_metric_tags_aggregation() { + let now = SystemTime::now(); + let mut spans = vec![ + get_test_span_with_meta( + now, + 1, + 0, + 100, + 5, + "A1", + "GET /objects", + 0, + &[("custom.primary", "a")], + &[("_dd.measured", 1.0)], + ), + get_test_span_with_meta( + now, + 2, + 0, + 100, + 5, + "A1", + "GET /objects", + 0, + &[("custom.primary", "b")], + &[("_dd.measured", 1.0)], + ), + ]; + compute_top_level_span(spans.as_mut_slice()); + + let mut concentrator_without_additional_metric_tags = SpanConcentrator::new( + Duration::from_nanos(BUCKET_SIZE), + now, + get_span_kinds(), + vec![], + vec![], + ); + let mut concentrator_with_additional_metric_tags = SpanConcentrator::new( + Duration::from_nanos(BUCKET_SIZE), + now, + get_span_kinds(), + vec![], + vec!["custom.primary".to_string()], + ); + for span in &spans { + concentrator_without_additional_metric_tags.add_span(span); + concentrator_with_additional_metric_tags.add_span(span); + } + + let flushtime = now + + Duration::from_nanos( + concentrator_with_additional_metric_tags.bucket_size + * concentrator_with_additional_metric_tags.buffer_len as u64, + ); + + let expected_without_additional_metric_tags = vec![pb::ClientGroupedStats { + service: "A1".to_string(), + resource: "GET /objects".to_string(), + r#type: "db".to_string(), + name: "query".to_string(), + duration: 200, + hits: 2, + top_level_hits: 2, + errors: 0, + is_trace_root: pb::Trilean::True.into(), + ..Default::default() + }]; + + let expected_with_additional_metric_tags = vec![ + pb::ClientGroupedStats { + service: "A1".to_string(), + resource: "GET /objects".to_string(), + r#type: "db".to_string(), + name: "query".to_string(), + additional_metric_tags: vec!["custom.primary:a".to_string()], + duration: 100, + hits: 1, + top_level_hits: 1, + errors: 0, + is_trace_root: pb::Trilean::True.into(), + ..Default::default() + }, + pb::ClientGroupedStats { + service: "A1".to_string(), + resource: "GET /objects".to_string(), + r#type: "db".to_string(), + name: "query".to_string(), + additional_metric_tags: vec!["custom.primary:b".to_string()], + duration: 100, + hits: 1, + top_level_hits: 1, + errors: 0, + is_trace_root: pb::Trilean::True.into(), + ..Default::default() + }, + ]; + + assert_counts_equal( + expected_without_additional_metric_tags, + concentrator_without_additional_metric_tags + .flush(flushtime, false) + .first() + .expect("There should be at least one time bucket") + .stats + .clone(), + ); + assert_counts_equal( + expected_with_additional_metric_tags, + concentrator_with_additional_metric_tags + .flush(flushtime, false) + .first() + .expect("There should be at least one time bucket") + .stats + .clone(), + ); +} + #[test] fn test_compute_stats_for_span_kind() { let test_cases: Vec<(SpanSlice, bool)> = vec![ @@ -1330,6 +1460,7 @@ fn test_pb_span() { now, get_span_kinds(), vec!["db.instance".to_string(), "db.system".to_string()], + vec!["custom.primary".to_string()], #[cfg(feature = "stats-obfuscation")] None, ); @@ -1449,6 +1580,32 @@ fn test_pb_span() { span_events: vec![], } }, + // Span with measured flag and additional metric tags + { + let mut meta = std::collections::HashMap::new(); + meta.insert("custom.primary".to_string(), "val".to_string()); + + let mut metrics = std::collections::HashMap::new(); + metrics.insert("_dd.measured".to_string(), 1.0); + + pb::Span { + service: "service1".to_string(), + name: "query".to_string(), + resource: "database_query".to_string(), + trace_id: 1, + span_id: 6, + parent_id: 1, + start: (aligned_now - BUCKET_SIZE + 40) as i64, + duration: 150, + error: 1, + r#type: "db".to_string(), + meta, + metrics, + meta_struct: std::collections::HashMap::new(), + span_links: vec![], + span_events: vec![], + } + }, // Grpc span { let mut meta = std::collections::HashMap::new(); @@ -1551,6 +1708,20 @@ fn test_pb_span() { is_trace_root: pb::Trilean::False.into(), ..Default::default() }, + // Measured span with additional metric tags + pb::ClientGroupedStats { + service: "service1".to_string(), + resource: "database_query".to_string(), + r#type: "db".to_string(), + name: "query".to_string(), + duration: 150, + hits: 1, + top_level_hits: 0, + errors: 1, + is_trace_root: pb::Trilean::False.into(), + additional_metric_tags: vec!["custom.primary:val".to_string()], + ..Default::default() + }, pb::ClientGroupedStats { service: "service1".to_string(), name: "rpc.grpc".to_string(), @@ -1581,6 +1752,7 @@ fn test_flush_with_otlp_exact_per_cell_scalars() { now, get_span_kinds(), vec![], + vec![], #[cfg(feature = "stats-obfuscation")] None, ); diff --git a/libdd-trace-stats/src/stats_exporter.rs b/libdd-trace-stats/src/stats_exporter.rs index c91a6de4c9..84037ff2c6 100644 --- a/libdd-trace-stats/src/stats_exporter.rs +++ b/libdd-trace-stats/src/stats_exporter.rs @@ -309,6 +309,7 @@ mod tests { SystemTime::now() - BUCKETS_DURATION * 3, vec![], vec![], + vec![], #[cfg(feature = "stats-obfuscation")] None, ); From 0aeaf8a40c204055675ab555479d6a4e136536f4 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Fri, 26 Jun 2026 13:09:39 -0400 Subject: [PATCH 3/5] sort, dedupe, and cap additional metric tags at 4 --- .../src/span_concentrator/mod.rs | 26 +++++++++++++-- .../src/span_concentrator/tests.rs | 32 +++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/libdd-trace-stats/src/span_concentrator/mod.rs b/libdd-trace-stats/src/span_concentrator/mod.rs index c0d639fcfb..1e870567fd 100644 --- a/libdd-trace-stats/src/span_concentrator/mod.rs +++ b/libdd-trace-stats/src/span_concentrator/mod.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use std::time::{self, Duration, SystemTime}; use libdd_trace_protobuf::pb; +use tracing::warn; use aggregation::StatsBucket; @@ -15,6 +16,25 @@ pub use aggregation::{FixedAggregationKey, OtlpExactCell, OtlpExactGroup, OtlpSt pub mod stat_span; pub use stat_span::StatSpan; +const ADDITIONAL_METRIC_TAG_KEYS_CAP: usize = 4; + +/// Deduplicate, sort alphabetically, and cap `keys` at [`ADDITIONAL_METRIC_TAG_KEYS_CAP`]. +/// Excess keys are dropped and logged as a one-time warning. +fn normalize_additional_metric_tag_keys(mut keys: Vec) -> Vec { + keys.sort_unstable(); + keys.dedup(); + if keys.len() > ADDITIONAL_METRIC_TAG_KEYS_CAP { + let dropped = keys.split_off(ADDITIONAL_METRIC_TAG_KEYS_CAP); + warn!( + "additional_metric_tag_keys: {} additional metric tag keys exceed the cap of {}; dropping: {:?}", + dropped.len() + ADDITIONAL_METRIC_TAG_KEYS_CAP, + ADDITIONAL_METRIC_TAG_KEYS_CAP, + dropped, + ); + } + keys +} + /// Concentrators that can provide raw time buckets for export implement this trait. /// /// `StatsExporter` is generic over `C: FlushableConcentrator` so it can work with @@ -128,7 +148,9 @@ impl SpanConcentrator { buffer_len: 2, span_kinds_stats_computed, peer_tag_keys, - additional_metric_tag_keys, + additional_metric_tag_keys: normalize_additional_metric_tag_keys( + additional_metric_tag_keys, + ), #[cfg(feature = "stats-obfuscation")] obfuscation_config: obfuscation_config.unwrap_or_default(), } @@ -161,7 +183,7 @@ impl SpanConcentrator { /// Set the list of keys considered as additional_metric_tag_keys for aggregation pub fn set_additional_metric_tag_keys(&mut self, tag_keys: Vec) { - self.additional_metric_tag_keys = tag_keys; + self.additional_metric_tag_keys = normalize_additional_metric_tag_keys(tag_keys); } /// Return the bucket size used for aggregation diff --git a/libdd-trace-stats/src/span_concentrator/tests.rs b/libdd-trace-stats/src/span_concentrator/tests.rs index cf1adf0ade..bfee205ed3 100644 --- a/libdd-trace-stats/src/span_concentrator/tests.rs +++ b/libdd-trace-stats/src/span_concentrator/tests.rs @@ -1794,3 +1794,35 @@ fn test_flush_with_otlp_exact_per_cell_scalars() { assert_eq!(group.hits, 5); assert_eq!(group.errors, 2); } + +#[test] +fn test_normalize_additional_metric_tag_keys_sort() { + let keys = vec!["region".to_string(), "env".to_string(), "tenant".to_string()]; + let result = normalize_additional_metric_tag_keys(keys); + assert_eq!(result, vec!["env", "region", "tenant"]); +} + +#[test] +fn test_normalize_additional_metric_tag_keys_dedup() { + let keys = vec![ + "region".to_string(), + "region".to_string(), + "tenant".to_string(), + ]; + let result = normalize_additional_metric_tag_keys(keys); + assert_eq!(result, vec!["region", "tenant"]); +} + +#[test] +fn test_normalize_additional_metric_tag_keys_cap() { + let keys = vec![ + "aaa".to_string(), + "bbb".to_string(), + "ccc".to_string(), + "ddd".to_string(), + "eee".to_string(), + ]; + let result = normalize_additional_metric_tag_keys(keys); + assert_eq!(result, vec!["aaa", "bbb", "ccc", "ddd"]); + assert_eq!(result.len(), 4); +} From a94c64ddb22f56e2925d9c44c016d381d1ed0f8d Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Fri, 26 Jun 2026 13:15:04 -0400 Subject: [PATCH 4/5] set max length of additional metric tag value to 200 --- .../src/span_concentrator/aggregation.rs | 15 +++- .../src/span_concentrator/tests.rs | 70 +++++++++++++++++++ 2 files changed, 84 insertions(+), 1 deletion(-) diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index 32c33443da..61e9d51925 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -10,10 +10,13 @@ use libdd_trace_obfuscation::ip_address::quantize_peer_ip_addresses; use libdd_trace_protobuf::pb; use libdd_trace_utils::span::SpanText; use std::borrow::{Borrow, Cow}; +use tracing::warn; use crate::span_concentrator::StatSpan; const TAG_STATUS_CODE: &str = "http.status_code"; +const ADDITIONAL_METRIC_TAG_VALUE_MAX_LEN: usize = 200; +const TRACER_BLOCKED_VALUE: &str = "tracer_blocked_value"; const TAG_SYNTHETICS: &str = "synthetics"; const TAG_SPANKIND: &str = "span.kind"; const TAG_ORIGIN: &str = "_dd.origin"; @@ -292,7 +295,17 @@ impl<'a> BorrowedAggregationKey<'a> { let additional_metric_tags: Vec<(&'a str, &'a str)> = additional_metric_tag_keys .iter() .filter_map(|key| match span.get_meta(key.as_str()) { - Some(v) if !v.is_empty() => Some((key.as_str(), v)), + Some(v) if !v.is_empty() => { + if v.len() > ADDITIONAL_METRIC_TAG_VALUE_MAX_LEN { + warn!( + "additional_metric_tags: value for key '{}' exceeds {} characters; substituting tracer_blocked_value", + key, ADDITIONAL_METRIC_TAG_VALUE_MAX_LEN, + ); + Some((key.as_str(), TRACER_BLOCKED_VALUE)) + } else { + Some((key.as_str(), v)) + } + } _ => None, }) .collect(); diff --git a/libdd-trace-stats/src/span_concentrator/tests.rs b/libdd-trace-stats/src/span_concentrator/tests.rs index bfee205ed3..92a8b4f8ae 100644 --- a/libdd-trace-stats/src/span_concentrator/tests.rs +++ b/libdd-trace-stats/src/span_concentrator/tests.rs @@ -1795,6 +1795,76 @@ fn test_flush_with_otlp_exact_per_cell_scalars() { assert_eq!(group.errors, 2); } +#[test] +fn test_additional_metric_tag_value_length_cap_substitutes_blocked_value() { + let now = SystemTime::now(); + let long_value = "x".repeat(201); + let meta = [("region", long_value.as_str())]; + let mut spans = vec![get_test_span_with_meta( + now, + 1, + 0, + 100, + 5, + "svc", + "GET /foo", + 0, + &meta, + &[("_dd.measured", 1.0)], + )]; + compute_top_level_span(spans.as_mut_slice()); + + let mut concentrator = SpanConcentrator::new( + Duration::from_nanos(BUCKET_SIZE), + now, + get_span_kinds(), + vec![], + vec!["region".to_string()], + ); + concentrator.add_span(&spans[0]); + + let flushtime = now + + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64); + let buckets = concentrator.flush(flushtime, false); + let tags = &buckets[0].stats[0].additional_metric_tags; + assert_eq!(tags, &["region:tracer_blocked_value"]); +} + +#[test] +fn test_additional_metric_tag_value_at_length_cap_passes_through() { + let now = SystemTime::now(); + let ok_value = "x".repeat(200); + let meta = [("region", ok_value.as_str())]; + let mut spans = vec![get_test_span_with_meta( + now, + 1, + 0, + 100, + 5, + "svc", + "GET /foo", + 0, + &meta, + &[("_dd.measured", 1.0)], + )]; + compute_top_level_span(spans.as_mut_slice()); + + let mut concentrator = SpanConcentrator::new( + Duration::from_nanos(BUCKET_SIZE), + now, + get_span_kinds(), + vec![], + vec!["region".to_string()], + ); + concentrator.add_span(&spans[0]); + + let flushtime = now + + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64); + let buckets = concentrator.flush(flushtime, false); + let tags = &buckets[0].stats[0].additional_metric_tags; + assert_eq!(tags, &[format!("region:{ok_value}")]); +} + #[test] fn test_normalize_additional_metric_tag_keys_sort() { let keys = vec!["region".to_string(), "env".to_string(), "tenant".to_string()]; From fc99d327bb54ca04a7c28b67117f546b11ea48a4 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Fri, 26 Jun 2026 14:20:47 -0400 Subject: [PATCH 5/5] enforce per-bucket cardinality limit on additional_metric_tags --- .../src/span_concentrator/aggregation.rs | 82 +++++++++-- .../src/span_concentrator/mod.rs | 24 +++- .../src/span_concentrator/tests.rs | 131 ++++++++++++++++++ 3 files changed, 227 insertions(+), 10 deletions(-) diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index 61e9d51925..1356636c07 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -331,6 +331,24 @@ impl<'a> BorrowedAggregationKey<'a> { additional_metric_tags, } } + + /// Return an owned copy of this key with all additional metric tag values replaced by + /// `TRACER_BLOCKED_VALUE`. Used when the per-bucket stat-entry limit is exceeded. + pub(super) fn into_masked_owned(self) -> OwnedAggregationKey { + OwnedAggregationKey { + fixed: self.fixed.convert(str::to_owned), + peer_tags: self + .peer_tags + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + additional_metric_tags: self + .additional_metric_tags + .iter() + .map(|(k, _)| (k.to_string(), TRACER_BLOCKED_VALUE.to_string())) + .collect(), + } + } } impl From for OwnedAggregationKey { @@ -468,19 +486,31 @@ pub struct OtlpStatsBucket { pub(super) struct StatsBucket { data: HashMap, start: u64, + /// Number of distinct entries with additional metric tags admitted this bucket. + additional_metric_tags_entry_count: usize, + /// Maximum distinct entries with additional metric tags per bucket. + additional_metric_tags_cardinality_limit: usize, } impl StatsBucket { - /// Return a new StatsBucket starting at the given timestamp - pub(super) fn new(start_timestamp: u64) -> Self { + /// Return a new StatsBucket starting at the given timestamp. + /// `additional_metric_tags_cardinality_limit` limits the number of distinct aggregation keys that include + /// additional metric tags; overflow entries have their tag values masked to TRACER_BLOCKED_VALUE. + pub(super) fn new(start_timestamp: u64, additional_metric_tags_cardinality_limit: usize) -> Self { Self { data: HashMap::new(), start: start_timestamp, + additional_metric_tags_entry_count: 0, + additional_metric_tags_cardinality_limit, } } /// Insert a value as stats in the group corresponding to the aggregation key, if it does /// not exist it creates it. + /// + /// If the key has additional metric tags and would create a new entry beyond the limit, all + /// additional tag values are replaced with `TRACER_BLOCKED_VALUE` before insertion. Keys that + /// already exist in this bucket always merge normally regardless of the limit. pub(super) fn insert( &mut self, key: BorrowedAggregationKey<'_>, @@ -489,13 +519,47 @@ impl StatsBucket { is_top_level: bool, grpc_method: &str, ) { - self.data - .entry_ref(&key) - .or_insert_with(|| GroupedStats { - grpc_method: grpc_method.to_owned(), - ..Default::default() - }) - .insert(duration, is_error, is_top_level); + if key.additional_metric_tags.is_empty() { + self.data + .entry_ref(&key) + .or_insert_with(|| GroupedStats { + grpc_method: grpc_method.to_owned(), + ..Default::default() + }) + .insert(duration, is_error, is_top_level); + return; + } + + // Key has additional metric tags: check existence before applying the limit. + match self.data.entry_ref(&key) { + hashbrown::hash_map::EntryRef::Occupied(mut e) => { + // Already exists — merge normally, no limit check needed. + e.get_mut().insert(duration, is_error, is_top_level); + } + hashbrown::hash_map::EntryRef::Vacant(e) => { + if self.additional_metric_tags_entry_count < self.additional_metric_tags_cardinality_limit { + // Under limit — admit new entry. + self.additional_metric_tags_entry_count += 1; + e.insert(GroupedStats { + grpc_method: grpc_method.to_owned(), + ..Default::default() + }) + .insert(duration, is_error, is_top_level); + } else { + // Cap exceeded — mask tag values and merge into the overflow entry. + // Drop the vacant entry to release the mutable borrow before re-entering. + drop(e); + let masked = key.into_masked_owned(); + self.data + .entry(masked) + .or_insert_with(|| GroupedStats { + grpc_method: grpc_method.to_owned(), + ..Default::default() + }) + .insert(duration, is_error, is_top_level); + } + } + } } /// Consume the bucket and return a ClientStatsBucket containing the bucket stats. diff --git a/libdd-trace-stats/src/span_concentrator/mod.rs b/libdd-trace-stats/src/span_concentrator/mod.rs index 1e870567fd..b358a5adda 100644 --- a/libdd-trace-stats/src/span_concentrator/mod.rs +++ b/libdd-trace-stats/src/span_concentrator/mod.rs @@ -17,6 +17,7 @@ pub mod stat_span; pub use stat_span::StatSpan; const ADDITIONAL_METRIC_TAG_KEYS_CAP: usize = 4; +const DEFAULT_ADDITIONAL_METRIC_TAGS_CARDINALITY_LIMIT: usize = 100; /// Deduplicate, sort alphabetically, and cap `keys` at [`ADDITIONAL_METRIC_TAG_KEYS_CAP`]. /// Excess keys are dropped and logged as a one-time warning. @@ -116,6 +117,8 @@ pub struct SpanConcentrator { peer_tag_keys: Vec, /// keys for additional tags on trace stats additional_metric_tag_keys: Vec, + /// limit on distinct stat entries with additional metric tags per flush bucket + additional_metric_tags_cardinality_limit: usize, #[cfg(feature = "stats-obfuscation")] obfuscation_config: SharedStatsComputationObfuscationConfig, } @@ -151,6 +154,7 @@ impl SpanConcentrator { additional_metric_tag_keys: normalize_additional_metric_tag_keys( additional_metric_tag_keys, ), + additional_metric_tags_cardinality_limit: DEFAULT_ADDITIONAL_METRIC_TAGS_CARDINALITY_LIMIT, #[cfg(feature = "stats-obfuscation")] obfuscation_config: obfuscation_config.unwrap_or_default(), } @@ -186,6 +190,24 @@ impl SpanConcentrator { self.additional_metric_tag_keys = normalize_additional_metric_tag_keys(tag_keys); } + /// Return the per-bucket limit on distinct stat entries that include additional metric tags + pub fn additional_metric_tags_cardinality_limit(&self) -> usize { + self.additional_metric_tags_cardinality_limit + } + + /// Set the per-bucket limit on distinct stat entries that include additional metric tags. + /// Values less than or equal to 0 are rejected and the existing limit is preserved with a warning. + pub fn set_additional_metric_tags_cardinality_limit(&mut self, limit: usize) { + if limit == 0 { + warn!( + "DD_TRACE_STATS_ADDITIONAL_TAGS_CARDINALITY_LIMIT must be > 0; keeping default of {}", + self.additional_metric_tags_cardinality_limit, + ); + return; + } + self.additional_metric_tags_cardinality_limit = limit; + } + /// Return the bucket size used for aggregation pub fn get_bucket_size(&self) -> Duration { Duration::from_nanos(self.bucket_size) @@ -220,7 +242,7 @@ impl SpanConcentrator { }; self.buckets .entry(bucket_timestamp) - .or_insert(StatsBucket::new(bucket_timestamp)) + .or_insert(StatsBucket::new(bucket_timestamp, self.additional_metric_tags_cardinality_limit)) .insert( agg_key, span.duration(), diff --git a/libdd-trace-stats/src/span_concentrator/tests.rs b/libdd-trace-stats/src/span_concentrator/tests.rs index 92a8b4f8ae..d83bce929c 100644 --- a/libdd-trace-stats/src/span_concentrator/tests.rs +++ b/libdd-trace-stats/src/span_concentrator/tests.rs @@ -1795,6 +1795,137 @@ fn test_flush_with_otlp_exact_per_cell_scalars() { assert_eq!(group.errors, 2); } +#[test] +fn test_additional_metric_tags_cardinality_limit_masks_overflow_entries() { + // With a cap of 1, the first distinct tag value is admitted; the second gets masked. + let now = SystemTime::now(); + let meta_a = [("region", "us-east-1")]; + let meta_b = [("region", "eu-west-1")]; + let mut spans = vec![ + get_test_span_with_meta(now, 1, 0, 100, 5, "svc", "GET /a", 0, &meta_a, &[("_dd.measured", 1.0)]), + get_test_span_with_meta(now, 2, 0, 100, 5, "svc", "GET /b", 0, &meta_b, &[("_dd.measured", 1.0)]), + ]; + compute_top_level_span(spans.as_mut_slice()); + + let mut concentrator = SpanConcentrator::new( + Duration::from_nanos(BUCKET_SIZE), + now, + get_span_kinds(), + vec![], + vec!["region".to_string()], + #[cfg(feature = "stats-obfuscation")] + None, + ); + concentrator.set_additional_metric_tags_cardinality_limit(1); + for span in &spans { + concentrator.add_span(span); + } + + let flushtime = + now + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64); + let buckets = concentrator.flush(flushtime, false); + let mut tags: Vec<&str> = buckets[0] + .stats + .iter() + .flat_map(|s| s.additional_metric_tags.iter().map(String::as_str)) + .collect(); + tags.sort_unstable(); + assert_eq!(tags, vec!["region:tracer_blocked_value", "region:us-east-1"]); +} + +#[test] +fn test_additional_metric_tags_cardinality_limit_existing_keys_merge_after_limit() { + // A key admitted before the cap is hit continues merging after the cap is exceeded. + let now = SystemTime::now(); + let meta_a = [("region", "us-east-1")]; + let meta_b = [("region", "eu-west-1")]; + // Three spans: first two fill and exceed a cap of 1, third re-hits the admitted key. + let mut spans = vec![ + get_test_span_with_meta(now, 1, 0, 100, 5, "svc", "GET /a", 0, &meta_a, &[("_dd.measured", 1.0)]), + get_test_span_with_meta(now, 2, 0, 100, 5, "svc", "GET /b", 0, &meta_b, &[("_dd.measured", 1.0)]), + get_test_span_with_meta(now, 3, 0, 100, 5, "svc", "GET /a", 0, &meta_a, &[("_dd.measured", 1.0)]), + ]; + compute_top_level_span(spans.as_mut_slice()); + + let mut concentrator = SpanConcentrator::new( + Duration::from_nanos(BUCKET_SIZE), + now, + get_span_kinds(), + vec![], + vec!["region".to_string()], + #[cfg(feature = "stats-obfuscation")] + None, + ); + concentrator.set_additional_metric_tags_cardinality_limit(1); + for span in &spans { + concentrator.add_span(span); + } + + let flushtime = + now + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64); + let buckets = concentrator.flush(flushtime, false); + let admitted = buckets[0] + .stats + .iter() + .find(|s| s.additional_metric_tags == vec!["region:us-east-1"]) + .expect("admitted entry should exist"); + // spans 1 and 3 both have region:us-east-1 and the same agg key, so hits == 2. + assert_eq!(admitted.hits, 2); +} + +#[test] +fn test_additional_metric_tags_cardinality_limit_resets_on_flush() { + // After flushing, a new bucket starts with a fresh cap budget. + let now = SystemTime::now(); + let meta_a = [("region", "us-east-1")]; + let meta_b = [("region", "eu-west-1")]; + let mut spans = vec![ + get_test_span_with_meta(now, 1, 0, 100, 5, "svc", "GET /a", 0, &meta_a, &[("_dd.measured", 1.0)]), + get_test_span_with_meta(now, 2, 0, 100, 5, "svc", "GET /b", 0, &meta_b, &[("_dd.measured", 1.0)]), + ]; + compute_top_level_span(spans.as_mut_slice()); + + let mut concentrator = SpanConcentrator::new( + Duration::from_nanos(BUCKET_SIZE), + now, + get_span_kinds(), + vec![], + vec!["region".to_string()], + #[cfg(feature = "stats-obfuscation")] + None, + ); + concentrator.set_additional_metric_tags_cardinality_limit(1); + + // First bucket: cap of 1 means only us-east-1 is admitted unmasked. + for span in &spans { + concentrator.add_span(span); + } + let flush1_time = + now + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64); + let _first_buckets = concentrator.flush(flush1_time, true); + + // Second bucket: cap resets; eu-west-1 can now be admitted unmasked. + let later = flush1_time + Duration::from_nanos(BUCKET_SIZE); + let meta_b2 = [("region", "eu-west-1")]; + let mut spans2 = vec![get_test_span_with_meta( + later, 4, 0, 100, 5, "svc", "GET /b", 0, &meta_b2, &[("_dd.measured", 1.0)], + )]; + compute_top_level_span(spans2.as_mut_slice()); + concentrator.add_span(&spans2[0]); + + let flush2_time = later + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64); + let second_buckets = concentrator.flush(flush2_time, true); + let tags: Vec<&str> = second_buckets + .iter() + .flat_map(|b| b.stats.iter()) + .flat_map(|s| s.additional_metric_tags.iter().map(String::as_str)) + .collect(); + assert!( + tags.contains(&"region:eu-west-1"), + "eu-west-1 should be admitted unmasked in a fresh bucket, got: {tags:?}" + ); +} + #[test] fn test_additional_metric_tag_value_length_cap_substitutes_blocked_value() { let now = SystemTime::now();