Skip to content

Commit 100d8cc

Browse files
feat(writer): implement native writer [APMSP-1875] (#13071)
## TLDR This PR adds a new Writer which uses the trace exporter from libdatadog to send traces to the agent. ## Description - Add PyO3 bindings to expose the rust TraceExporter to python - Add a new NativeWriter using the bindings - Add a config option to toggle between the AgentWriter and the NativeWriter - Update tests accessing private methods/attributes of the writer to support both implementations - Register fork hooks for the writer required as the trace exporter rely on a multi-threaded runtime ## Motivation This PR will allow dogfooding of the Trace Exporter in dd-trace-py ## Testing The full test suite has been run on with NativeWriter enable by default, when it is disabled only writer tests and integration tests are run on the NativeWriter. ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: Brett Langdon <[email protected]>
1 parent 198b883 commit 100d8cc

File tree

27 files changed

+1871
-251
lines changed

27 files changed

+1871
-251
lines changed

.gitlab/benchmarks/bp-runner.microbenchmarks.fail-on-breach.yml

Lines changed: 94 additions & 93 deletions
Large diffs are not rendered by default.

.riot/requirements/1550212.txt

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#
2+
# This file is autogenerated by pip-compile with Python 3.13
3+
# by the following command:
4+
#
5+
# pip-compile --allow-unsafe --no-annotate .riot/requirements/1550212.in
6+
#
7+
attrs==25.3.0
8+
coverage[toml]==7.10.1
9+
hypothesis==6.45.0
10+
iniconfig==2.1.0
11+
mock==5.2.0
12+
msgpack==1.1.1
13+
opentracing==2.4.0
14+
packaging==25.0
15+
pluggy==1.6.0
16+
pygments==2.19.2
17+
pytest==8.4.1
18+
pytest-cov==6.2.1
19+
pytest-mock==3.14.1
20+
pytest-randomly==3.16.0
21+
sortedcontainers==2.4.0

ddtrace/_trace/processor/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,8 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
160160
if root_ctx and root_ctx.sampling_priority is None:
161161
self.sampler.sample(trace[0]._local_root)
162162
# When stats computation is enabled in the tracer then we can
163-
# safely drop the traces.
164-
if self._compute_stats_enabled and not self.apm_opt_out:
163+
# safely drop the traces. When using the NativeWriter this is handled by native code.
164+
if not config._trace_writer_native and self._compute_stats_enabled and not self.apm_opt_out:
165165
priority = root_ctx.sampling_priority if root_ctx is not None else None
166166
if priority is not None and priority <= 0:
167167
# When any span is marked as keep by a single span sampling

ddtrace/_trace/tracer.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ def _default_span_processors_factory(
119119

120120
span_processors.append(AppSecIastSpanProcessor())
121121

122-
if config._trace_compute_stats:
122+
# When using the NativeWriter stats are computed by the native code.
123+
if config._trace_compute_stats and not config._trace_writer_native:
123124
# Inline the import to avoid pulling in ddsketch or protobuf
124125
# when importing ddtrace.
125126
from ddtrace.internal.processor.stats import SpanStatsProcessorV06
@@ -264,6 +265,8 @@ def sample(self, span):
264265
self._sampler.sample(span)
265266

266267
def _sample_before_fork(self) -> None:
268+
if isinstance(self._span_aggregator.writer, AgentWriterInterface):
269+
self._span_aggregator.writer.before_fork()
267270
span = self.current_root_span()
268271
if span is not None and span.context.sampling_priority is None:
269272
self.sample(span)

ddtrace/internal/native/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,18 @@
33
from typing import Optional
44
from typing import Tuple
55

6+
from ._native import AgentError # noqa: F401
7+
from ._native import BuilderError # noqa: F401
68
from ._native import DDSketch # noqa: F401
9+
from ._native import DeserializationError # noqa: F401
10+
from ._native import IoError # noqa: F401
11+
from ._native import NetworkError # noqa: F401
712
from ._native import PyConfigurator
813
from ._native import PyTracerMetadata # noqa: F401
14+
from ._native import RequestError # noqa: F401
15+
from ._native import SerializationError # noqa: F401
16+
from ._native import TraceExporter # noqa: F401
17+
from ._native import TraceExporterBuilder # noqa: F401
918
from ._native import store_metadata # noqa: F401
1019

1120

ddtrace/internal/native/_native.pyi

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,3 +139,244 @@ def store_metadata(data: PyTracerMetadata) -> PyAnonymousFileHandle:
139139
:param data: The tracer configuration to store.
140140
"""
141141
...
142+
143+
class TraceExporter:
144+
"""
145+
TraceExporter is a class responsible for exporting traces to the Agent.
146+
"""
147+
148+
def __init__(self):
149+
"""
150+
Initialize a TraceExporter.
151+
"""
152+
...
153+
def send(self, data: bytes, trace_count: int) -> str:
154+
"""
155+
Send a trace payload to the Agent.
156+
:param data: The msgpack encoded trace payload to send.
157+
:param trace_count: The number of traces in the data payload.
158+
"""
159+
...
160+
def shutdown(self, timeout_ns: int) -> None:
161+
"""
162+
Shutdown the TraceExporter, releasing any resources and ensuring all pending stats are sent.
163+
This method should be called before the application exits to ensure proper cleanup.
164+
:param timeout_ns: The maximum time to wait for shutdown in nanoseconds.
165+
"""
166+
...
167+
def drop(self) -> None:
168+
"""
169+
Drop the TraceExporter, releasing any resources without sending pending stats.
170+
"""
171+
...
172+
def run_worker(self) -> None:
173+
"""
174+
Start the rust worker threads.
175+
This starts the runtime required to process rust async tasks including stats and telemetry sending.
176+
The runtime will also be created when calling `send`,
177+
this method can be used to start the runtime before sending any traces.
178+
"""
179+
...
180+
def stop_worker(self) -> None:
181+
"""
182+
Stop the rust worker threads.
183+
This stops the async runtime and must be called before forking to avoid deadlocks after forking.
184+
This should be called even if `run_worker` hasn't been called as the runtime will be started
185+
when calling `send`.
186+
"""
187+
...
188+
def debug(self) -> str:
189+
"""
190+
Returns a string representation of the exporter.
191+
Should only be used for debugging.
192+
"""
193+
...
194+
195+
class TraceExporterBuilder:
196+
"""
197+
TraceExporterBuilder is a class responsible for building a TraceExporter.
198+
"""
199+
200+
def __init__(self):
201+
"""
202+
Initialize a TraceExporterBuilder.
203+
"""
204+
...
205+
def set_hostname(self, hostname: str) -> TraceExporterBuilder:
206+
"""
207+
Set the hostname of the TraceExporter.
208+
:param hostname: The hostname to set for the TraceExporter.
209+
"""
210+
...
211+
def set_url(self, url: str) -> TraceExporterBuilder:
212+
"""
213+
Set the agent url of the TraceExporter.
214+
:param url: The URL of the agent to send traces to.
215+
"""
216+
...
217+
def set_dogstatsd_url(self, url: str) -> TraceExporterBuilder:
218+
"""
219+
Set the DogStatsD URL of the TraceExporter.
220+
:param url: The URL of the DogStatsD endpoint.
221+
"""
222+
...
223+
def set_env(self, env: str) -> TraceExporterBuilder:
224+
"""
225+
Set the env of the TraceExporter.
226+
:param env: The environment name (e.g., 'prod', 'staging', 'dev').
227+
"""
228+
...
229+
def set_app_version(self, version: str) -> TraceExporterBuilder:
230+
"""
231+
Set the app version of the TraceExporter.
232+
:param version: The version string of the application.
233+
"""
234+
...
235+
def set_git_commit_sha(self, git_commit_sha: str) -> TraceExporterBuilder:
236+
"""
237+
Set the git commit sha of the TraceExporter.
238+
:param git_commit_sha: The git commit SHA of the current code version.
239+
"""
240+
...
241+
def set_tracer_version(self, version: str) -> TraceExporterBuilder:
242+
"""
243+
Set the tracer version of the TraceExporter.
244+
:param version: The version string of the tracer.
245+
"""
246+
...
247+
def set_language(self, language: str) -> TraceExporterBuilder:
248+
"""
249+
Set the language of the TraceExporter.
250+
:param language: The programming language being traced (e.g., 'python').
251+
"""
252+
...
253+
def set_language_version(self, version: str) -> TraceExporterBuilder:
254+
"""
255+
Set the language version of the TraceExporter.
256+
:param version: The version string of the programming language.
257+
"""
258+
...
259+
def set_language_interpreter(self, interpreter: str) -> TraceExporterBuilder:
260+
"""
261+
Set the language interpreter of the TraceExporter.
262+
:param vendor: The language interpreter.
263+
"""
264+
...
265+
def set_language_interpreter_vendor(self, vendor: str) -> TraceExporterBuilder:
266+
"""
267+
Set the language interpreter vendor of the TraceExporter.
268+
:param vendor: The vendor of the language interpreter.
269+
"""
270+
...
271+
def set_test_session_token(self, token: str) -> TraceExporterBuilder:
272+
"""
273+
Set the test session token for the TraceExporter.
274+
:param token: The test session token to use for authentication.
275+
"""
276+
...
277+
def set_input_format(self, input_format: str) -> TraceExporterBuilder:
278+
"""
279+
Set the input format for the trace data.
280+
:param input_format: The format to use for input traces (supported values are "v0.4" and "v0.5").
281+
:raises ValueError: If input_format is not a supported value.
282+
"""
283+
...
284+
def set_output_format(self, output_format: str) -> TraceExporterBuilder:
285+
"""
286+
Set the output format for the trace data.
287+
:param output_format: The format to use for output traces (supported values are "v0.4" and "v0.5").
288+
:raises ValueError: If output_format is not a supported value.
289+
"""
290+
...
291+
def set_client_computed_top_level(self) -> TraceExporterBuilder:
292+
"""
293+
Set the header indicating the tracer has computed the top-level tag
294+
"""
295+
...
296+
def set_client_computed_stats(self) -> TraceExporterBuilder:
297+
"""
298+
Set the header indicating the tracer has already computed stats.
299+
This should not be used along with `enable_stats`.
300+
The main use is to opt-out trace metrics.
301+
"""
302+
...
303+
def enable_stats(self, bucket_size_ns: int) -> TraceExporterBuilder:
304+
"""
305+
Enable stats computation in the TraceExporter
306+
:param bucket_size_ns: The size of stats bucket in nanoseconds.
307+
"""
308+
...
309+
def enable_telemetry(
310+
self,
311+
heartbeat_ms: int,
312+
runtime_id: str,
313+
) -> TraceExporterBuilder:
314+
"""
315+
Emit telemetry in the TraceExporter
316+
:param heartbeat: The flush interval for telemetry metrics in milliseconds.
317+
:param runtime_id: The runtime id to use for telemetry.
318+
"""
319+
...
320+
def build(self) -> TraceExporter:
321+
"""
322+
Build and return a TraceExporter instance with the configured settings.
323+
This method consumes the builder, so it cannot be used again after calling build.
324+
:return: A configured TraceExporter instance.
325+
:raises ValueError: If the builder has already been consumed or if required settings are missing.
326+
"""
327+
...
328+
def debug(self) -> str:
329+
"""
330+
Returns a string representation of the exporter.
331+
Should only be used for debugging.
332+
"""
333+
...
334+
335+
class AgentError(Exception):
336+
"""
337+
Raised when there is an error in agent response processing.
338+
"""
339+
340+
...
341+
342+
class BuilderError(Exception):
343+
"""
344+
Raised when there is an error in the TraceExporterBuilder configuration.
345+
"""
346+
347+
...
348+
349+
class DeserializationError(Exception):
350+
"""
351+
Raised when there is an error deserializing trace payload.
352+
"""
353+
354+
...
355+
356+
class IoError(Exception):
357+
"""
358+
Raised when there is an I/O error during trace processing.
359+
"""
360+
361+
...
362+
363+
class NetworkError(Exception):
364+
"""
365+
Raised when there is a network-related error during trace processing.
366+
"""
367+
368+
...
369+
370+
class RequestError(Exception):
371+
"""
372+
Raised when the agent responds with an error code.
373+
"""
374+
375+
...
376+
377+
class SerializationError(Exception):
378+
"""
379+
Raised when there is an error serializing trace payload.
380+
"""
381+
382+
...

ddtrace/internal/writer/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from .writer import AgentWriterInterface
55
from .writer import HTTPWriter
66
from .writer import LogWriter
7+
from .writer import NativeWriter
78
from .writer import Response
89
from .writer import TraceWriter
910
from .writer import _human_size
@@ -18,6 +19,7 @@
1819
"DEFAULT_SMA_WINDOW",
1920
"HTTPWriter",
2021
"LogWriter",
22+
"NativeWriter",
2123
"Response",
2224
"TraceWriter",
2325
"WriterClientBase",

0 commit comments

Comments
 (0)