Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
5b07491
feat(data_pipeline): Add pausable worker module
VianneyRuhlmann May 26, 2025
7e37d00
feat(ddtelemetry): Implement Debug for worker
VianneyRuhlmann May 27, 2025
2eabbed
feat(data_pipeline): Implement debug for agent_info fetcher
VianneyRuhlmann May 27, 2025
8b79aea
feat(ddtelemetry): Expose worker to be used in PausableWorker
VianneyRuhlmann May 27, 2025
590a60a
feat(data_pipeline): Remove unused mutability in stats exporter
VianneyRuhlmann May 27, 2025
736916b
feat(data-pipeline): Add internal error
VianneyRuhlmann May 27, 2025
efc38ea
feat(data-pipeline): Use PausableWorker in trace exporter
VianneyRuhlmann May 27, 2025
f7e5c6e
fix(lint): Add file headers
VianneyRuhlmann May 27, 2025
9a213ea
fix(data-pipeline): Make PausableWorker private
VianneyRuhlmann May 27, 2025
ad49ce7
fix(data-pipeline): Empty message queue in test
VianneyRuhlmann May 27, 2025
da6dee5
fix(data-pipeline): Apply suggestions
VianneyRuhlmann Jun 6, 2025
845781d
Merge branch 'main' into vianney/data-pipeline/add-threads-shutdown
VianneyRuhlmann Jun 6, 2025
104d270
fix(worker): fix formatting
VianneyRuhlmann Jun 6, 2025
17ba3b8
fix(PausableWorker): fix doc test
VianneyRuhlmann Jun 6, 2025
e507732
fix(pausable_worker): fix flaky test
VianneyRuhlmann Jun 10, 2025
f403d1e
feat(pausable_worker): Implement core Error trait
VianneyRuhlmann Jun 10, 2025
5b721f5
Merge branch 'main' into vianney/data-pipeline/add-threads-shutdown
VianneyRuhlmann Jun 10, 2025
a85c21c
fix(pausable_worker): fix clippy lints
VianneyRuhlmann Jun 10, 2025
87d96ad
Merge branch 'main' into vianney/data-pipeline/add-threads-shutdown
VianneyRuhlmann Jun 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion data-pipeline-ffi/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use data_pipeline::trace_exporter::error::{
AgentErrorKind, BuilderErrorKind, NetworkErrorKind, TraceExporterError,
AgentErrorKind, BuilderErrorKind, InternalErrorKind, NetworkErrorKind, TraceExporterError,
};
use std::ffi::{c_char, CString};
use std::fmt::Display;
Expand Down Expand Up @@ -32,6 +32,7 @@ pub enum ExporterErrorCode {
NetworkUnknown,
Serde,
TimedOut,
Internal,
}

impl Display for ExporterErrorCode {
Expand All @@ -57,6 +58,7 @@ impl Display for ExporterErrorCode {
Self::NetworkUnknown => write!(f, "Unknown network error"),
Self::Serde => write!(f, "Serialization/Deserialization error"),
Self::TimedOut => write!(f, "Operation timed out"),
Self::Internal => write!(f, "Internal error"),
}
}
}
Expand Down Expand Up @@ -89,6 +91,9 @@ impl From<TraceExporterError> for ExporterError {
BuilderErrorKind::InvalidTelemetryConfig => ExporterErrorCode::InvalidArgument,
BuilderErrorKind::InvalidConfiguration(_) => ExporterErrorCode::InvalidArgument,
},
TraceExporterError::Internal(e) => match e {
InternalErrorKind::InvalidWorkerState(_) => ExporterErrorCode::Internal,
},
TraceExporterError::Deserialization(_) => ExporterErrorCode::Serde,
TraceExporterError::Io(e) => match e.kind() {
IoErrorKind::InvalidData => ExporterErrorCode::InvalidData,
Expand Down
27 changes: 15 additions & 12 deletions data-pipeline/src/agent_info/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
use super::{schema::AgentInfo, AgentInfoArc};
use anyhow::{anyhow, Result};
use arc_swap::ArcSwapOption;
use ddcommon::hyper_migration;
use ddcommon::Endpoint;
use ddcommon::{hyper_migration, worker::Worker, Endpoint};
use http_body_util::BodyExt;
use hyper::{self, body::Buf, header::HeaderName};
use std::sync::Arc;
Expand Down Expand Up @@ -96,12 +95,13 @@ pub async fn fetch_info(info_endpoint: &Endpoint) -> Result<Box<AgentInfo>> {
/// # Example
/// ```no_run
/// # use anyhow::Result;
/// # use ddcommon::worker::Worker;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// // Define the endpoint
/// let endpoint = ddcommon::Endpoint::from_url("http://localhost:8126/info".parse().unwrap());
/// // Create the fetcher
/// let fetcher = data_pipeline::agent_info::AgentInfoFetcher::new(
/// let mut fetcher = data_pipeline::agent_info::AgentInfoFetcher::new(
/// endpoint,
/// std::time::Duration::from_secs(5 * 60),
/// );
Expand All @@ -122,6 +122,7 @@ pub async fn fetch_info(info_endpoint: &Endpoint) -> Result<Box<AgentInfo>> {
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
pub struct AgentInfoFetcher {
info_endpoint: Endpoint,
info: AgentInfoArc,
Expand All @@ -139,11 +140,20 @@ impl AgentInfoFetcher {
}
}

/// Return an AgentInfoArc storing the info received by the agent.
///
/// When the fetcher is running it updates the AgentInfoArc when the agent's info changes.
pub fn get_info(&self) -> AgentInfoArc {
self.info.clone()
}
}

impl Worker for AgentInfoFetcher {
/// Start fetching the info endpoint with the given interval.
///
/// # Warning
/// This method does not return and should be called within a dedicated task.
pub async fn run(&self) {
async fn run(&mut self) {
loop {
let current_info = self.info.load();
let current_hash = current_info.as_ref().map(|info| info.state_hash.as_str());
Expand All @@ -163,13 +173,6 @@ impl AgentInfoFetcher {
sleep(self.refresh_interval).await;
}
}

/// Return an AgentInfoArc storing the info received by the agent.
///
/// When the fetcher is running it updates the AgentInfoArc when the agent's info changes.
pub fn get_info(&self) -> AgentInfoArc {
self.info.clone()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -328,7 +331,7 @@ mod tests {
})
.await;
let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap());
let fetcher = AgentInfoFetcher::new(endpoint.clone(), Duration::from_millis(100));
let mut fetcher = AgentInfoFetcher::new(endpoint.clone(), Duration::from_millis(100));
let info = fetcher.get_info();
assert!(info.load().is_none());
tokio::spawn(async move {
Expand Down
1 change: 1 addition & 0 deletions data-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

pub mod agent_info;
mod health_metrics;
mod pausable_worker;
#[allow(missing_docs)]
pub mod span_concentrator;
#[allow(missing_docs)]
Expand Down
172 changes: 172 additions & 0 deletions data-pipeline/src/pausable_worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Defines a pausable worker to be able to stop background processes before forks

use ddcommon::worker::Worker;
use std::fmt::Display;
use tokio::{
runtime::Runtime,
select,
task::{JoinError, JoinHandle},
};
use tokio_util::sync::CancellationToken;

/// A pausable worker which can be paused and restarted on forks.
///
/// Used to allow a [`ddcommon::worker::Worker`] to be paused while saving its state when dropping
/// a tokio runtime to be able to restart with the same state on a new runtime. This is used to
/// stop all threads before a fork to avoid deadlocks in child.
///
/// # Time-to-pause
/// This loop should yield regularly to reduce time-to-pause. See [`tokio::task::yield_now`].
///
/// # Cancellation safety
/// The main loop can be interrupted at any yield point (`.await`ed call). The state of the worker
/// at this point will be saved and used to restart the worker. To be able to safely restart, the
/// worker must be in a valid state on every call to `.await`.
/// See [`tokio::select#cancellation-safety`] for more details.
#[derive(Debug)]
pub enum PausableWorker<T: Worker + Send + Sync + 'static> {
Running {
handle: JoinHandle<T>,
stop_token: CancellationToken,
},
Paused {
worker: T,
},
InvalidState,
}

#[derive(Debug)]
pub enum PausableWorkerError {
InvalidState,
TaskAborted,
}

impl Display for PausableWorkerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PausableWorkerError::InvalidState => {
write!(f, "Worker is in an invalid state and must be recreated.")
}
PausableWorkerError::TaskAborted => {
write!(f, "Worker task has been aborted and state has been lost.")
}
}
}
}

impl core::error::Error for PausableWorkerError {}

impl<T: Worker + Send + Sync + 'static> PausableWorker<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to impl Drop? What happens if PausableWorker goes out of scope? Are tokio tasks going to continue to run in the background? Should you be canceling the stop_token?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task is going to continue in the background until the runtime is dropped. I think it is fine to not cancel the token. Either way this should be described in the doc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a very strong opinion, but why is it ok for the task to continue? Is it not likely that workers can go out of scope but the runtime stick around?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion either. The way I see it the PausableWorker is more of a handle to the worker which is running in the runtime.

/// Create a new pausable worker from the given worker.
pub fn new(worker: T) -> Self {
Self::Paused { worker }
}

/// Start the worker on the given runtime.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the public functions, should we have more thorough rustdoc comments with examples? Or perhaps, just more thorough rustdoc comment for the module or enum that explains why this is necessary and when it should be used? If someone outside of our team needs to implement a worker in the future, do they have enough information to do so independently?

///
/// The worker's main loop will be run on the runtime.
///
/// # Errors
/// Fails if the worker is in an invalid state.
pub fn start(&mut self, rt: &Runtime) -> Result<(), PausableWorkerError> {
if let Self::Running { .. } = self {
Ok(())
} else if let Self::Paused { mut worker } = std::mem::replace(self, Self::InvalidState) {
// Worker is temporarily in an invalid state, but since this block is failsafe it will
// be replaced by a valid state.
let stop_token = CancellationToken::new();
let cloned_token = stop_token.clone();
let handle = rt.spawn(async move {
select! {
_ = worker.run() => {worker}
_ = cloned_token.cancelled() => {worker}
}
});

*self = PausableWorker::Running { handle, stop_token };
Ok(())
} else {
Err(PausableWorkerError::InvalidState)
}
}

/// Pause the worker saving it's state to be restarted.
///
/// # Errors
/// Fails if the worker handle has been aborted preventing the worker from being retrieved.
pub async fn pause(&mut self) -> Result<(), PausableWorkerError> {
match self {
PausableWorker::Running { handle, stop_token } => {
stop_token.cancel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a possible race condition here? If the task is already shutting down and pause is called can we wind up in an InvalidState when we don't want to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're safe since pause takes a mutable reference. handle.await() only returns an error if the tokio task has been aborted.

Copy link
Contributor

@ekump ekump Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're safe since pause takes a mutable reference

Yes, which is why I wasn't concerned with pause() being called multiple times. I was more concerned with a potential race condition when pause() is called at the same time that a worker is shutting down outside of the pause workflow. If the task shuts down gracefully in this situation we don't want to be in InvalidState because of timing.

handle.await() only returns an error if the tokio task has been aborted.

Looking at this some more, I agree that we are probably ok. handle.await() is going to return Ok in the scenario I described as long as it's a graceful shutdown. We'll be in an InvalidState with a panic or abort...but that is what we want anyway.

if let Ok(worker) = handle.await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we have workers that don't frequently yield, or wind up deadlocked? Are we going to await forever? Or, if not forever, long enough where it causes problems?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I'll add a warning in doc and a timeout in stop_worker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually tokio timeout won't work as it will be checked when the task yields so we'll have the same issue. I think we can keep the warning for now and add timeout as a follow-up item.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what it's worth, tokio supports "cancellation tokens", which is how in profiling we stop tokio if folks e.g. ctrl+c and the profiler is uploading a profile on a slow connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pausable worker uses a cancellation token internally but we still have to wait for the worker to yield. I don't think there's a way force the worker to yield earlier though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's a way force the worker to yield earlier though.

I think it's possible, but would require a big refactor and add significant complexity to our workers. I don't think it's worth pursuing until we see evidence this solution isn't sufficient.

I think the only real problem is high-frequency forking apps. We need to support that situation. I guess the only way to do that is to ensure the workers are as efficient as possible and yield appropriately? I don't think there is anything that can be practically done in pause().

*self = PausableWorker::Paused { worker };
Ok(())
} else {
// The task has been aborted and the worker can't be retrieved.
*self = PausableWorker::InvalidState;
Err(PausableWorkerError::TaskAborted)
}
}
PausableWorker::Paused { .. } => Ok(()),
PausableWorker::InvalidState => Err(PausableWorkerError::InvalidState),
}
}

/// Wait for the run method of the worker to exit.
pub async fn join(self) -> Result<(), JoinError> {
if let PausableWorker::Running { handle, .. } = self {
handle.await?;
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use tokio::{runtime::Builder, time::sleep};

use super::*;
use std::{
sync::mpsc::{channel, Sender},
time::Duration,
};

/// Test worker incrementing the state and sending it with the sender.
struct TestWorker {
state: u32,
sender: Sender<u32>,
}

impl Worker for TestWorker {
async fn run(&mut self) {
loop {
let _ = self.sender.send(self.state);
self.state += 1;
sleep(Duration::from_millis(100)).await;
}
}
}

#[test]
fn test_restart() {
let (sender, receiver) = channel::<u32>();
let worker = TestWorker { state: 0, sender };
let runtime = Builder::new_multi_thread().enable_time().build().unwrap();
let mut pausable_worker = PausableWorker::new(worker);

pausable_worker.start(&runtime).unwrap();

assert_eq!(receiver.recv().unwrap(), 0);
runtime.block_on(async { pausable_worker.pause().await.unwrap() });
// Empty the message queue and get the last message
let mut next_message = 1;
for message in receiver.try_iter() {
next_message = message + 1;
}
pausable_worker.start(&runtime).unwrap();
assert_eq!(receiver.recv().unwrap(), next_message);
}
}
6 changes: 4 additions & 2 deletions data-pipeline/src/stats_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
use crate::{span_concentrator::SpanConcentrator, trace_exporter::TracerMetadata};
use datadog_trace_protobuf::pb;
use datadog_trace_utils::send_with_retry::{send_with_retry, RetryStrategy};
use ddcommon::Endpoint;
use ddcommon::{worker::Worker, Endpoint};
use hyper;
use tokio::select;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -127,13 +127,15 @@ impl StatsExporter {
.flush(time::SystemTime::now(), force_flush),
)
}
}

impl Worker for StatsExporter {
/// Run loop of the stats exporter
///
/// Once started, the stats exporter will flush and send stats on every `self.flush_interval`.
/// If the `self.cancellation_token` is cancelled, the exporter will force flush all stats and
/// return.
pub async fn run(&mut self) {
async fn run(&mut self) {
loop {
select! {
_ = self.cancellation_token.cancelled() => {
Expand Down
Loading
Loading