Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions datadog-ipc/src/shm_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -819,8 +819,8 @@ impl ShmSpanConcentrator {
}

impl FlushableConcentrator for ShmSpanConcentrator {
fn flush_buckets(&mut self, force: bool) -> Vec<pb::ClientStatsBucket> {
self.drain_buckets(force)
fn flush_buckets(&mut self, force: bool) -> (Vec<pb::ClientStatsBucket>, u64) {
(self.drain_buckets(force), 0)
}
}

Expand Down
2 changes: 1 addition & 1 deletion datadog-sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ datadog-sidecar-macros = { path = "../datadog-sidecar-macros" }
libdd-telemetry = { path = "../libdd-telemetry", features = ["tracing"] }
libdd-data-pipeline = { path = "../libdd-data-pipeline" }
libdd-trace-utils = { path = "../libdd-trace-utils" }
libdd-trace-stats = { path = "../libdd-trace-stats", default-features=false, features = ["https"] }
libdd-trace-stats = { path = "../libdd-trace-stats", features = ["telemetry", "dogstatsd"] }
libdd-remote-config = { path = "../libdd-remote-config" }
datadog-live-debugger = { path = "../datadog-live-debugger" }
datadog-ffe = { path = "../datadog-ffe", features = ["exposure-events", "evaluation-metrics"] }
Expand Down
2 changes: 2 additions & 0 deletions datadog-sidecar/src/service/stats_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ fn make_exporter(
)),
#[cfg(feature = "stats-obfuscation")]
"0",
None,
None,
Comment on lines +122 to +123

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we not want to benefit from that telemetry in PHP?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably do I just wanted to limit the number of crates impacted by this PR (also why I enabled it in a separate PR for data-pipeline) but I can enable it for php too

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do so :-)
I definitely won't have this telemetry on my radar myself afterwards

)
}

Expand Down
4 changes: 2 additions & 2 deletions libdd-data-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime", de
libdd-telemetry = { version = "5.0.1", path = "../libdd-telemetry", default-features = false, optional = true}
libdd-trace-protobuf = { version = "3.0.2", path = "../libdd-trace-protobuf" }
libdd-trace-normalization = { version = "2.0.0", path = "../libdd-trace-normalization" }
libdd-trace-stats = { version = "5.0.0", path = "../libdd-trace-stats", default-features = false }
libdd-trace-stats = { version = "5.0.0", path = "../libdd-trace-stats", default-features = false, features = ["dogstatsd"] }
libdd-trace-utils = { version = "8.0.0", path = "../libdd-trace-utils", default-features = false }
libdd-trace-obfuscation = { version = "4.0.0", path = "../libdd-trace-obfuscation", default-features = false, optional = true }
libdd-ddsketch = { version = "1.0.1", path = "../libdd-ddsketch" }
Expand Down Expand Up @@ -86,7 +86,7 @@ duplicate = "2.0.1"

[features]
default = ["https", "telemetry"]
telemetry = ["libdd-telemetry"]
telemetry = ["libdd-telemetry", "libdd-trace-stats/telemetry"]
https = [
"libdd-common/https",
"libdd-capabilities-impl/https",
Expand Down
2 changes: 1 addition & 1 deletion libdd-data-pipeline/src/otlp/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ pub struct OtlpStatsExporter<C: HttpClientCapability + SleepCapability> {
impl<C: HttpClientCapability + SleepCapability> OtlpStatsExporter<C> {
/// Flush the concentrator and export stats; returns `Ok(true)` if anything was sent.
async fn send(&self, force_flush: bool, max_retries: u32) -> anyhow::Result<bool> {
let buckets = {
let (buckets, _collapsed_count) = {
#[allow(clippy::unwrap_used)]
let mut c = self.concentrator.lock().unwrap();
c.flush_with_otlp_exact(SystemTime::now(), force_flush)
Expand Down
3 changes: 3 additions & 0 deletions libdd-data-pipeline/src/trace_exporter/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ fn create_and_start_stats_worker<
client_side_stats.obfuscation_config.clone(),
#[cfg(feature = "stats-obfuscation")]
SUPPORTED_OBFUSCATION_VERSION_STR,
#[cfg(feature = "telemetry")]
None,
None,
Comment thread
VianneyRuhlmann marked this conversation as resolved.
);
let worker_handle = ctx
.shared_runtime
Expand Down
6 changes: 5 additions & 1 deletion libdd-trace-stats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ anyhow = "1.0"
libdd-capabilities = { path = "../libdd-capabilities", version = "2.0.0" }
libdd-common = { version = "5.0.0", path = "../libdd-common", default-features = false }
libdd-ddsketch = { version = "1.0.1", path = "../libdd-ddsketch" }
libdd-dogstatsd-client = { version = "3.0.0", path = "../libdd-dogstatsd-client", default-features = false, optional = true }
libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime", default-features = false }
libdd-telemetry = { version = "5.0.1", path = "../libdd-telemetry", default-features = false, optional = true }
libdd-trace-protobuf = { version = "3.0.2", path = "../libdd-trace-protobuf" }
libdd-trace-obfuscation = { version = "4.0.0", path = "../libdd-trace-obfuscation", default-features = false }
libdd-trace-utils = { version = "8.0.0", path = "../libdd-trace-utils", default-features = false }
Expand Down Expand Up @@ -49,5 +51,7 @@ tokio = { version = "1.23", features = ["rt-multi-thread", "macros", "test-util"
[features]
default = ["https"]
stats-obfuscation = []
https = ["libdd-common/https", "libdd-capabilities-impl/https", "libdd-shared-runtime/https"]
telemetry = ["libdd-telemetry"]
dogstatsd = ["libdd-dogstatsd-client"]
https = ["libdd-common/https", "libdd-capabilities-impl/https", "libdd-shared-runtime/https","libdd-telemetry?/https","libdd-dogstatsd-client?/https" ]
fips = ["libdd-common/fips", "libdd-capabilities-impl/fips", "libdd-shared-runtime/fips"]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is fips support missing for the added crates?

29 changes: 22 additions & 7 deletions libdd-trace-stats/src/span_concentrator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ pub use stat_span::StatSpan;
/// `StatsExporter` is generic over `C: FlushableConcentrator` so it can work with
/// both the in-process [`SpanConcentrator`] and the SHM-backed `ShmSpanConcentrator`.
pub trait FlushableConcentrator {
fn flush_buckets(&mut self, force: bool) -> Vec<pb::ClientStatsBucket>;
/// Flush time buckets and return them together with the number of spans that were
/// collapsed into the overflow sentinel bucket due to cardinality limiting.
fn flush_buckets(&mut self, force: bool) -> (Vec<pb::ClientStatsBucket>, u64);
}

impl FlushableConcentrator for SpanConcentrator {
fn flush_buckets(&mut self, force: bool) -> Vec<pb::ClientStatsBucket> {
fn flush_buckets(&mut self, force: bool) -> (Vec<pb::ClientStatsBucket>, u64) {
self.flush(SystemTime::now(), force)
}
}
Expand Down Expand Up @@ -227,14 +229,26 @@ impl SpanConcentrator {

/// Flush all stats bucket except for the `buffer_len` most recent. If `force` is true, flush
/// all buckets.
pub fn flush(&mut self, now: SystemTime, force: bool) -> Vec<pb::ClientStatsBucket> {
///
/// Returns a tuple of `(buckets, collapsed_spans)` where `collapsed_spans` is the total number
/// of spans that were collapsed into the overflow sentinel bucket due to cardinality limiting
/// across all flushed time buckets.
pub fn flush(&mut self, now: SystemTime, force: bool) -> (Vec<pb::ClientStatsBucket>, u64) {
self.drain_due_buckets(now, force, StatsBucket::flush)
}

/// Like [`Self::flush`], but also emits exact per-cell scalars alongside each bucket for the
/// OTLP trace-metrics path. The protobuf bucket inside each [`OtlpStatsBucket`] is identical
/// to what [`Self::flush`] would produce, so the /v0.6/stats agent path is unaffected.
pub fn flush_with_otlp_exact(&mut self, now: SystemTime, force: bool) -> Vec<OtlpStatsBucket> {
///
/// Returns a tuple of `(buckets, collapsed_spans)` where `collapsed_spans` is the total number
/// of spans that were collapsed into the overflow sentinel bucket due to cardinality limiting
/// across all flushed time buckets.
pub fn flush_with_otlp_exact(
&mut self,
now: SystemTime,
force: bool,
) -> (Vec<OtlpStatsBucket>, u64) {
self.drain_due_buckets(now, force, StatsBucket::flush_with_otlp_exact)
}

Expand All @@ -243,7 +257,7 @@ impl SpanConcentrator {
now: SystemTime,
force: bool,
encode: impl Fn(StatsBucket, u64) -> T,
) -> Vec<T> {
) -> (Vec<T>, u64) {
// TODO: Wait for HashMap::extract_if to be stabilized to avoid a full drain
let now_timestamp = system_time_to_unix_duration(now).as_nanos() as u64;
let buckets: Vec<(u64, StatsBucket)> = self.buckets.drain().collect();
Expand All @@ -254,7 +268,7 @@ impl SpanConcentrator {
- (self.buffer_len as u64 - 1) * self.bucket_size
};
let mut total_collapsed = 0;
buckets
let buckets_pb = buckets
.into_iter()
.filter_map(|(timestamp, bucket)| {
// Always keep `bufferLen` buckets (default is 2: current + previous one).
Expand All @@ -272,7 +286,8 @@ impl SpanConcentrator {
total_collapsed += bucket.collapsed_count();
Some(encode(bucket, self.bucket_size))
})
.collect()
.collect();
(buckets_pb, total_collapsed)
}
}

Expand Down
44 changes: 22 additions & 22 deletions libdd-trace-stats/src/span_concentrator/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ fn test_concentrator_oldest_timestamp_cold() {

// Assert we didn't insert spans in older buckets
for _ in 0..concentrator.buffer_len {
let stats = concentrator.flush(flushtime, false);
let (stats, _) = concentrator.flush(flushtime, false);
assert_eq!(stats.len(), 0, "We should get 0 time buckets");
flushtime += Duration::from_nanos(concentrator.bucket_size);
}

let stats = concentrator.flush(flushtime, false);
let (stats, _) = concentrator.flush(flushtime, false);

assert_eq!(stats.len(), 1, "We should get exactly one time bucket");

Expand Down Expand Up @@ -188,12 +188,12 @@ fn test_concentrator_oldest_timestamp_hot() {
}

for _ in 0..(concentrator.buffer_len - 1) {
let stats = concentrator.flush(flushtime, false);
let (stats, _) = concentrator.flush(flushtime, false);
assert!(stats.is_empty(), "We should get 0 time buckets");
flushtime += Duration::from_nanos(concentrator.bucket_size);
}

let stats = concentrator.flush(flushtime, false);
let (stats, _) = concentrator.flush(flushtime, false);
assert_eq!(stats.len(), 1, "We should get exactly one time bucket");

// First oldest bucket aggregates, it should have it all except the
Expand All @@ -213,7 +213,7 @@ fn test_concentrator_oldest_timestamp_hot() {
assert_counts_equal(expected, stats.first().unwrap().stats.clone());

flushtime += Duration::from_nanos(concentrator.bucket_size);
let stats = concentrator.flush(flushtime, false);
let (stats, _) = concentrator.flush(flushtime, false);
assert_eq!(stats.len(), 1, "We should get exactly one time bucket");

// Stats of the last four spans.
Expand Down Expand Up @@ -278,7 +278,7 @@ fn test_concentrator_stats_totals() {
let mut flushtime = now;

for _ in 0..=concentrator.buffer_len {
let stats = concentrator.flush(flushtime, false);
let (stats, _) = concentrator.flush(flushtime, false);
if stats.is_empty() {
continue;
}
Expand Down Expand Up @@ -528,7 +528,7 @@ fn test_concentrator_stats_counts() {
let mut flushtime = now;

for _ in 0..=concentrator.buffer_len + 2 {
let stats = concentrator.flush(flushtime, false);
let (stats, _) = concentrator.flush(flushtime, false);
let expected_flushed_timestamps = align_timestamp(
system_time_to_unix_duration(flushtime).as_nanos() as u64,
concentrator.bucket_size,
Expand All @@ -553,7 +553,7 @@ fn test_concentrator_stats_counts() {
stats.first().unwrap().stats.clone(),
);

let stats = concentrator.flush(flushtime, false);
let (stats, _) = concentrator.flush(flushtime, false);
assert_eq!(
stats.len(),
0,
Expand Down Expand Up @@ -658,7 +658,7 @@ fn test_span_should_be_included_in_stats() {
},
];

let stats = concentrator.flush(
let (stats, _) = concentrator.flush(
now + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64),
false,
);
Expand Down Expand Up @@ -696,7 +696,7 @@ fn test_ignore_partial_spans() {
concentrator.add_span(span);
}

let stats = concentrator.flush(
let (stats, _) = concentrator.flush(
now + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64),
false,
);
Expand Down Expand Up @@ -727,10 +727,10 @@ fn test_force_flush() {
let flushtime = now - Duration::from_secs(3600);

// Bucket should not be flushed without force flush
let stats = concentrator.flush(flushtime, false);
let (stats, _) = concentrator.flush(flushtime, false);
assert_eq!(0, stats.len());

let stats = concentrator.flush(flushtime, true);
let (stats, _) = concentrator.flush(flushtime, true);
assert_eq!(1, stats.len());
}

Expand Down Expand Up @@ -900,7 +900,7 @@ fn test_peer_tags_aggregation() {
},
];

let stats_with_peer_tags = concentrator_with_peer_tags.flush(flushtime, false);
let (stats_with_peer_tags, _) = concentrator_with_peer_tags.flush(flushtime, false);
assert_counts_equal(
expected_with_peer_tags,
stats_with_peer_tags
Expand All @@ -910,7 +910,7 @@ fn test_peer_tags_aggregation() {
.clone(),
);

let stats_without_peer_tags = concentrator_without_peer_tags.flush(flushtime, false);
let (stats_without_peer_tags, _) = concentrator_without_peer_tags.flush(flushtime, false);
assert_counts_equal(
expected_without_peer_tags,
stats_without_peer_tags
Expand Down Expand Up @@ -1023,7 +1023,7 @@ fn test_peer_tags_quantization_aggregation() {
..Default::default()
}];

let stats_with_peer_tags = concentrator_with_peer_tags.flush(flushtime, false);
let (stats_with_peer_tags, _) = concentrator_with_peer_tags.flush(flushtime, false);
assert_counts_equal(
expected_with_peer_tags,
stats_with_peer_tags
Expand Down Expand Up @@ -1193,7 +1193,7 @@ fn test_base_service_peer_tag() {
},
];

let stats = concentrator.flush(flushtime, false);
let (stats, _) = concentrator.flush(flushtime, false);
assert_counts_equal(
expected,
stats
Expand Down Expand Up @@ -1497,7 +1497,7 @@ fn test_pb_span() {
// Flush and get stats
let flushtime =
now + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64);
let stats = concentrator.flush(flushtime, false);
let (stats, _) = concentrator.flush(flushtime, false);

assert_eq!(stats.len(), 1, "Should get exactly one time bucket");
let bucket = &stats[0];
Expand Down Expand Up @@ -1610,7 +1610,7 @@ fn test_flush_with_otlp_exact_per_cell_scalars() {
concentrator.add_span(s);
}

let flushed = concentrator.flush_with_otlp_exact(now, true);
let flushed = concentrator.flush_with_otlp_exact(now, true).0;
assert_eq!(flushed.len(), 1);
let b = &flushed[0];
assert_eq!(b.exact.len(), 1);
Expand Down Expand Up @@ -1676,7 +1676,7 @@ fn test_cardinality_limit_collapse() {
concentrator.add_span(&span);
}

let buckets = concentrator.flush(SystemTime::now(), true);
let (buckets, _) = concentrator.flush(SystemTime::now(), true);
assert!(!buckets.is_empty(), "should get at least one time bucket");

let stats = &buckets[0].stats;
Expand Down Expand Up @@ -1734,7 +1734,7 @@ fn test_overflow_bucket_counts() {
concentrator.add_span(&span);
}

let buckets = concentrator.flush(SystemTime::now(), true);
let (buckets, _) = concentrator.flush(SystemTime::now(), true);
assert!(!buckets.is_empty());
let stats = &buckets[0].stats;

Expand Down Expand Up @@ -1783,7 +1783,7 @@ fn test_no_collapse_within_limit() {
concentrator.add_span(&span);
}

let buckets = concentrator.flush(SystemTime::now(), true);
let (buckets, _) = concentrator.flush(SystemTime::now(), true);
assert!(!buckets.is_empty());
let stats = &buckets[0].stats;

Expand Down Expand Up @@ -1835,7 +1835,7 @@ fn test_overflow_bucket_key_sentinel_values() {
concentrator.add_span(&first);
concentrator.add_span(&second);

let buckets = concentrator.flush(SystemTime::now(), true);
let (buckets, _) = concentrator.flush(SystemTime::now(), true);
assert!(!buckets.is_empty());
let stats = &buckets[0].stats;

Expand Down
Loading
Loading