Skip to content

Commit c802b61

Browse files
feat(data-pipeline): Use PausableWorker in trace exporter
1 parent 077c131 commit c802b61

File tree

2 files changed

+302
-136
lines changed

2 files changed

+302
-136
lines changed

data-pipeline/src/telemetry/mod.rs

Lines changed: 57 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ use datadog_trace_utils::{
1212
};
1313
use ddcommon::tag::Tag;
1414
use ddtelemetry::worker::{
15-
LifecycleAction, TelemetryActions, TelemetryWorkerBuilder, TelemetryWorkerFlavor,
16-
TelemetryWorkerHandle,
15+
LifecycleAction, TelemetryActions, TelemetryWorker, TelemetryWorkerBuilder,
16+
TelemetryWorkerFlavor, TelemetryWorkerHandle,
1717
};
1818
use std::{collections::HashMap, time::Duration};
19-
use tokio::task::JoinHandle;
19+
use tokio::runtime::Handle;
2020

2121
/// Structure to build a Telemetry client.
2222
///
@@ -86,7 +86,10 @@ impl TelemetryClientBuilder {
8686
}
8787

8888
/// Builds the telemetry client.
89-
pub async fn build(self) -> Result<TelemetryClient, TelemetryError> {
89+
pub fn build(
90+
self,
91+
runtime: Handle,
92+
) -> Result<(TelemetryClient, TelemetryWorker), TelemetryError> {
9093
#[allow(clippy::unwrap_used)]
9194
let mut builder = TelemetryWorkerBuilder::new_fetch_host(
9295
self.service_name.unwrap(),
@@ -102,16 +105,17 @@ impl TelemetryClientBuilder {
102105
builder.runtime_id = Some(id);
103106
}
104107

105-
let (worker, handle) = builder
106-
.spawn()
107-
.await
108+
let (worker_handle, worker) = builder
109+
.build_worker(runtime)
108110
.map_err(|e| TelemetryError::Builder(e.to_string()))?;
109111

110-
Ok(TelemetryClient {
111-
handle,
112-
metrics: Metrics::new(&worker),
112+
Ok((
113+
TelemetryClient {
114+
metrics: Metrics::new(&worker_handle),
115+
worker: worker_handle,
116+
},
113117
worker,
114-
})
118+
))
115119
}
116120
}
117121

@@ -120,7 +124,6 @@ impl TelemetryClientBuilder {
120124
pub struct TelemetryClient {
121125
metrics: Metrics,
122126
worker: TelemetryWorkerHandle,
123-
handle: JoinHandle<()>,
124127
}
125128

126129
/// Telemetry describing the sending of a trace payload
@@ -246,26 +249,18 @@ impl TelemetryClient {
246249

247250
/// Starts the client
248251
pub async fn start(&self) {
249-
if let Err(_e) = self
252+
_ = self
250253
.worker
251254
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
252-
.await
253-
{
254-
self.handle.abort();
255-
}
255+
.await;
256256
}
257257

258258
/// Shutdowns the telemetry client.
259259
pub async fn shutdown(self) {
260-
if let Err(_e) = self
260+
_ = self
261261
.worker
262262
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Stop))
263-
.await
264-
{
265-
self.handle.abort();
266-
}
267-
268-
let _ = self.handle.await;
263+
.await;
269264
}
270265
}
271266

@@ -276,21 +271,23 @@ mod tests {
276271
use httpmock::MockServer;
277272
use hyper::{Response, StatusCode};
278273
use regex::Regex;
274+
use tokio::time::sleep;
279275

280276
use super::*;
281277

282278
async fn get_test_client(url: &str) -> TelemetryClient {
283-
TelemetryClientBuilder::default()
279+
let (client, mut worker) = TelemetryClientBuilder::default()
284280
.set_service_name("test_service")
285281
.set_language("test_language")
286282
.set_language_version("test_language_version")
287283
.set_tracer_version("test_tracer_version")
288284
.set_url(url)
289285
.set_heartbeat(100)
290286
.set_debug_enabled(true)
291-
.build()
292-
.await
293-
.unwrap()
287+
.build(Handle::current())
288+
.unwrap();
289+
tokio::spawn(async move { worker.run().await });
290+
client
294291
}
295292

296293
#[test]
@@ -320,15 +317,14 @@ mod tests {
320317
}
321318

322319
#[cfg_attr(miri, ignore)]
323-
#[tokio::test]
320+
#[tokio::test(flavor = "multi_thread")]
324321
async fn spawn_test() {
325322
let client = TelemetryClientBuilder::default()
326323
.set_service_name("test_service")
327324
.set_language("test_language")
328325
.set_language_version("test_language_version")
329326
.set_tracer_version("test_tracer_version")
330-
.build()
331-
.await;
327+
.build(Handle::current());
332328

333329
assert!(client.is_ok());
334330
}
@@ -356,6 +352,9 @@ mod tests {
356352
client.start().await;
357353
let _ = client.send(&data);
358354
client.shutdown().await;
355+
while telemetry_srv.hits_async().await == 0 {
356+
sleep(Duration::from_millis(10)).await;
357+
}
359358
telemetry_srv.assert_hits_async(1).await;
360359
}
361360

@@ -382,6 +381,9 @@ mod tests {
382381
client.start().await;
383382
let _ = client.send(&data);
384383
client.shutdown().await;
384+
while telemetry_srv.hits_async().await == 0 {
385+
sleep(Duration::from_millis(10)).await;
386+
}
385387
telemetry_srv.assert_hits_async(1).await;
386388
}
387389

@@ -408,6 +410,9 @@ mod tests {
408410
client.start().await;
409411
let _ = client.send(&data);
410412
client.shutdown().await;
413+
while telemetry_srv.hits_async().await == 0 {
414+
sleep(Duration::from_millis(10)).await;
415+
}
411416
telemetry_srv.assert_hits_async(1).await;
412417
}
413418

@@ -434,6 +439,9 @@ mod tests {
434439
client.start().await;
435440
let _ = client.send(&data);
436441
client.shutdown().await;
442+
while telemetry_srv.hits_async().await == 0 {
443+
sleep(Duration::from_millis(10)).await;
444+
}
437445
telemetry_srv.assert_hits_async(1).await;
438446
}
439447

@@ -460,6 +468,9 @@ mod tests {
460468
client.start().await;
461469
let _ = client.send(&data);
462470
client.shutdown().await;
471+
while telemetry_srv.hits_async().await == 0 {
472+
sleep(Duration::from_millis(10)).await;
473+
}
463474
telemetry_srv.assert_hits_async(1).await;
464475
}
465476

@@ -486,6 +497,9 @@ mod tests {
486497
client.start().await;
487498
let _ = client.send(&data);
488499
client.shutdown().await;
500+
while telemetry_srv.hits_async().await == 0 {
501+
sleep(Duration::from_millis(10)).await;
502+
}
489503
telemetry_srv.assert_hits_async(1).await;
490504
}
491505

@@ -512,6 +526,9 @@ mod tests {
512526
client.start().await;
513527
let _ = client.send(&data);
514528
client.shutdown().await;
529+
while telemetry_srv.hits_async().await == 0 {
530+
sleep(Duration::from_millis(10)).await;
531+
}
515532
telemetry_srv.assert_hits_async(1).await;
516533
}
517534

@@ -538,6 +555,9 @@ mod tests {
538555
client.start().await;
539556
let _ = client.send(&data);
540557
client.shutdown().await;
558+
while telemetry_srv.hits_async().await == 0 {
559+
sleep(Duration::from_millis(10)).await;
560+
}
541561
telemetry_srv.assert_hits_async(1).await;
542562
}
543563

@@ -675,10 +695,10 @@ mod tests {
675695
.set_url(&server.url("/"))
676696
.set_heartbeat(100)
677697
.set_runtime_id("foo")
678-
.build()
679-
.await;
698+
.build(Handle::current());
680699

681-
let client = result.unwrap();
700+
let (client, mut worker) = result.unwrap();
701+
tokio::spawn(async move { worker.run().await });
682702

683703
client.start().await;
684704
client
@@ -688,6 +708,9 @@ mod tests {
688708
})
689709
.unwrap();
690710
client.shutdown().await;
711+
while telemetry_srv.hits_async().await == 0 {
712+
sleep(Duration::from_millis(10)).await;
713+
}
691714
// One payload generate-metrics
692715
telemetry_srv.assert_hits_async(1).await;
693716
}

0 commit comments

Comments
 (0)