Skip to content

Commit c369549

Browse files
feat(NativeWriter): drop threads before worker
1 parent 6da28c4 commit c369549

File tree

7 files changed

+119
-258
lines changed

7 files changed

+119
-258
lines changed

ddtrace/_trace/tracer.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ def __init__(self) -> None:
278278
atexit.register(self._atexit)
279279
forksafe.register_before_fork(self._sample_before_fork)
280280
forksafe.register(self._child_after_fork)
281+
forksafe.register_after_parent(self._parent_after_fork)
281282

282283
self._shutdown_lock = RLock()
283284

@@ -325,9 +326,10 @@ def deregister_on_start_span(self, func: Callable) -> Callable:
325326
return func
326327

327328
def _sample_before_fork(self) -> None:
328-
span = self.current_root_span()
329329
if isinstance(self._writer, NativeWriter):
330-
self._writer._exporter.drop()
330+
self._writer._exporter.stop_worker()
331+
self._writer._native_worker.join()
332+
span = self.current_root_span()
331333
if span is not None and span.context.sampling_priority is None:
332334
self.sample(span)
333335

@@ -494,6 +496,11 @@ def _child_after_fork(self):
494496
self._recreate()
495497
self._new_process = True
496498

499+
def _parent_after_fork(self):
500+
if isinstance(self._writer, NativeWriter):
501+
pass
502+
# self._writer.start_worker_thread()
503+
497504
def _recreate(self):
498505
"""Re-initialize the tracer's processors and trace writer. This method should only be used in tests."""
499506
# Stop the writer.
@@ -950,6 +957,7 @@ def shutdown(self, timeout: Optional[float] = None) -> None:
950957

951958
atexit.unregister(self._atexit)
952959
forksafe.unregister(self._child_after_fork)
960+
forksafe.unregister(self._parent_after_fork)
953961
forksafe.unregister_before_fork(self._sample_before_fork)
954962

955963
self.start_span = self._start_span_after_shutdown # type: ignore[assignment]

ddtrace/internal/forksafe.py

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
_registry = [] # type: typing.List[typing.Callable[[], None]]
1818
_registry_before_fork = [] # type: typing.List[typing.Callable[[], None]]
19+
_registry_after_parent = [] # type: typing.List[typing.Callable[[], None]]
1920

2021
# Some integrations might require after-fork hooks to be executed after the
2122
# actual call to os.fork with earlier versions of Python (<= 3.6), else issues
@@ -28,16 +29,6 @@
2829
_forked = False
2930

3031

31-
def set_forked():
32-
global _forked
33-
34-
_forked = True
35-
36-
37-
def has_forked():
38-
return _forked
39-
40-
4132
def run_hooks(registry):
4233
# type: (typing.List[typing.Callable[[], None]]) -> None
4334
for hook in list(registry):
@@ -50,6 +41,17 @@ def run_hooks(registry):
5041

5142
ddtrace_before_fork = functools.partial(run_hooks, _registry_before_fork)
5243
ddtrace_after_in_child = functools.partial(run_hooks, _registry)
44+
ddtrace_after_in_parent = functools.partial(run_hooks, _registry_after_parent)
45+
46+
47+
def set_forked():
48+
global _forked
49+
50+
_forked = True
51+
52+
53+
def has_forked():
54+
return _forked
5355

5456

5557
def register_hook(registry, hook):
@@ -59,6 +61,9 @@ def register_hook(registry, hook):
5961

6062
register_before_fork = functools.partial(register_hook, _registry_before_fork)
6163
register = functools.partial(register_hook, _registry)
64+
register_after_parent = functools.partial(register_hook, _registry_after_parent)
65+
66+
register_after_parent(set_forked)
6267

6368

6469
def unregister(after_in_child):
@@ -69,6 +74,14 @@ def unregister(after_in_child):
6974
log.info("after_in_child hook %s was unregistered without first being registered", after_in_child.__name__)
7075

7176

77+
def unregister_parent(after_in_parent):
78+
# type: (typing.Callable[[], None]) -> None
79+
try:
80+
_registry.remove(after_in_parent)
81+
except ValueError:
82+
log.info("after_in_parent hook %s was unregistered without first being registered", after_in_parent.__name__)
83+
84+
7285
def unregister_before_fork(before_fork):
7386
# type: (typing.Callable[[], None]) -> None
7487
try:
@@ -78,7 +91,9 @@ def unregister_before_fork(before_fork):
7891

7992

8093
if hasattr(os, "register_at_fork"):
81-
os.register_at_fork(before=ddtrace_before_fork, after_in_child=ddtrace_after_in_child, after_in_parent=set_forked)
94+
os.register_at_fork(
95+
before=ddtrace_before_fork, after_in_child=ddtrace_after_in_child, after_in_parent=ddtrace_after_in_parent
96+
)
8297

8398
_resetable_objects = weakref.WeakSet() # type: weakref.WeakSet[ResetObject]
8499

ddtrace/internal/native/_native.pyi

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,9 @@ class TraceExporter:
8686
"""
8787
...
8888

89-
def drop(self) -> None:...
89+
def drop(self) -> None: ...
90+
def run_worker(self) -> None: ...
91+
def stop_worker(self) -> None: ...
9092

9193
class TraceExporterBuilder:
9294
"""

ddtrace/internal/writer/writer.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from ..agent import get_connection
3232
from ..constants import _HTTPLIB_NO_TRACE_REQUEST
3333
from ..encoding import JSONEncoderV2
34+
from ..gitmetadata import get_git_tags
3435
from ..logger import get_logger
3536
from ..serverless import in_azure_function
3637
from ..serverless import in_gcp_function
@@ -671,18 +672,20 @@ def __init__(
671672
self._compute_stats_enabled = compute_stats_enabled
672673
self._response_cb = response_callback
673674
self._exporter = self._create_exporter()
675+
self.start_worker_thread()
676+
677+
def start_worker_thread(self):
678+
self._native_worker = threading.Thread(target=self._exporter.run_worker, daemon=True)
679+
self._native_worker.start()
674680

675681
def _create_exporter(self) -> native.TraceExporter:
676682
"""
677683
Create a new TraceExporter with the current configuration.
678684
:return: A configured TraceExporter instance.
679685
"""
680-
# Shutdown the existing exporter if it exists
681-
if hasattr(self, "_exporter") and self._exporter is not None:
682-
self._exporter.shutdown(3_000_000_000) # 3 seconds timeout
683-
684686
stats_interval = float(os.getenv("_DD_TRACE_STATS_WRITER_INTERVAL") or 10.0)
685687
bucket_size_ns = int(stats_interval * 1e9) # type: int
688+
_, commit_sha, _ = get_git_tags()
686689

687690
builder = (
688691
native.TraceExporterBuilder()
@@ -691,6 +694,7 @@ def _create_exporter(self) -> native.TraceExporter:
691694
.set_language_version(compat.PYTHON_VERSION)
692695
.set_language_interpreter(compat.PYTHON_INTERPRETER)
693696
.set_tracer_version(ddtrace.__version__)
697+
.set_git_commit_sha(commit_sha)
694698
.set_client_computed_top_level()
695699
.set_input_format(self._api_version)
696700
.set_output_format(self._api_version)
@@ -707,10 +711,14 @@ def set_test_session_token(self, token: Optional[str]) -> None:
707711
:param token: The test session token to use for authentication.
708712
"""
709713
self._test_session_token = token
714+
self._exporter.stop_worker()
715+
self._native_worker.join()
710716
self._exporter = self._create_exporter()
717+
self.start_worker_thread()
711718

712719
def recreate(self):
713-
self._exporter.drop()
720+
if self._native_worker.is_alive():
721+
self._exporter.stop_worker()
714722
return self.__class__(
715723
agent_url=self.agent_url,
716724
processing_interval=self._interval,
@@ -793,6 +801,8 @@ def _send_payload(self, payload: bytes, count: int, client: WriterClientBase):
793801

794802
try:
795803
# TODO: Return agent response from send
804+
if not self._native_worker.is_alive:
805+
self.start_worker_thread()
796806
response_body = self._exporter.send(payload, count)
797807
if self._response_cb:
798808
response = Response(body=response_body)
@@ -931,4 +941,7 @@ def on_shutdown(self):
931941
try:
932942
self.periodic()
933943
finally:
944+
if self._native_worker.is_alive():
945+
self._exporter.stop_worker()
946+
self._native_worker.join()
934947
self._exporter.shutdown(3_000_000_000) # 3 seconds timeout

0 commit comments

Comments
 (0)