-
Notifications
You must be signed in to change notification settings - Fork 779
Fork safe ConcurrentMultiSpanProcessor #4862
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fork safe ConcurrentMultiSpanProcessor #4862
Conversation
|
|
3abcc2e to
f98d672
Compare
| os.register_at_fork(after_in_child=self._build_executor) | ||
|
|
||
| def _build_executor(self) -> concurrent.futures.ThreadPoolExecutor: | ||
| return concurrent.futures.ThreadPoolExecutor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there something that says explicitly THreadPoolExecutor needs to be reinitialized in forked processes ? I see the note about not forking threaded processes, but it's still a bit unclear to me..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there something that says explicitly THreadPoolExecutor needs to be reinitialized in forked processes ?
I don't think it's stated as such anywhere.
What is sure is that on POSIX environments, os.fork() keeps only the thread which called fork in a running state in the child process, cf https://pubs.opengroup.org/onlinepubs/9799919799/functions/fork.html:
A process shall be created with a single thread. If a multi-threaded process calls fork(), the new process shall contain a replica of the calling thread and its entire address space
Since I don't see an easy way to "restart" the threads from the ThreadPoolExecutor, restarting a fresh new thread pool seems like a good solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More on this, specifically on Linux, https://man7.org/linux/man-pages/man2/fork.2.html
The child process is created with a single thread—the one that called fork(). The entire virtual address space of the parent is replicated in the child, including the states of mutexes, condition variables, and other pthreads objects; the use of pthread_atfork(3) may be helpful for dealing with problems that this can cause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't see how to add fork to the actual test suite but here is at least an example I tried to put up to help test what's going on here:
import os
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=2)
def reinit_executor():
global executor
executor = ThreadPoolExecutor(max_workers=2)
# Remove this line to see the child process hanging forever:
os.register_at_fork(after_in_child=reinit_executor)
print(f"pid:{os.getpid()} in main thread before fork")
print(executor.submit(lambda : f"pid:{os.getpid()} in sub thread before fork").result())
child_pid = os.fork()
if child_pid:
print(f"pid:{os.getpid()} forked parent process to child process", child_pid)
else:
print(f"pid:{os.getpid()} forked process, in child")
print(f"pid:{os.getpid()} in main thread after fork")
print(executor.submit(lambda: f"pid:{os.getpid()} in sub thread after fork").result())
if child_pid:
print(f"pid:{os.getpid()} waiting for child process")
os.waitpid(child_pid, 0)
else:
print(f"pid:{os.getpid()} child process end")There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And a version with the ConcurrentMultiSpanProcessor.
The code below will hang forever on main and work as expected on the current branch:
import dataclasses
import os
from opentelemetry.sdk.trace import ConcurrentMultiSpanProcessor, SpanProcessor
@dataclasses.dataclass
class DummySpan:
name: str
class PrintSpanProcessor(SpanProcessor):
def on_start(
self,
span,
parent_context = None,
) :
print(f"pid:{os.getpid()} on_start", span.name)
processor = ConcurrentMultiSpanProcessor()
processor.add_span_processor(PrintSpanProcessor())
processor.on_start(DummySpan(name="before fork"))
child_pid = os.fork()
if child_pid:
print(f"pid:{os.getpid()} forked parent process to child process", child_pid)
else:
print(f"pid:{os.getpid()} forked process, in child")
print(f"pid:{os.getpid()} in main thread after fork")
processor.on_start(DummySpan(name="a span"))
if child_pid:
print(f"pid:{os.getpid()} waiting for child process")
os.waitpid(child_pid, 0)
else:
print(f"pid:{os.getpid()} child process end")There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks to the tests in TestBatchProcessor, I've been able to write a unit test in the end, reproducing the problem 🎉
6795b6f to
8e7c298
Compare
8e7c298 to
808fe91
Compare
Description
ConcurrentMultiSpanProcessoris not "fork safe". This MR tries to solve this problem.Forking a threaded process is not a good idea in the first place, citing the Python doc, see https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
Also, the OTel Python SDK suggest to only initialize the SDK post fork, see https://opentelemetry-python.readthedocs.io/en/latest/examples/fork-process-model/README.html
Nevertheless, on an existing code base, it's not always easy to avoid all forks. So it looks like a good idea to try and make the SDK as fork safe as possible.
Note:
BatchProcessoris already relying onregister_at_forkused here, seeopentelemetry-python/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py
Line 125 in 102fec2
Fixes #2349
Type of change
Please delete options that are not relevant.
How Has This Been Tested?
See unit test and discussions below.
An equivalent patch is currently running in our stack.
Does This PR Require a Contrib Repo Change?
?
Checklist: