Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from datetime import datetime, timedelta
from threading import Event, Thread
from types import GeneratorType
from typing import Any, Generator, Optional, Sequence, TypeVar, Union
from typing import Any, Generator, Iterator, Optional, Sequence, TypeVar, Union

import grpc
from google.protobuf import empty_pb2
Expand All @@ -30,7 +30,7 @@
# If `opentelemetry-sdk` is available, enable the tracer
try:
from opentelemetry import trace
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.trace.propagation.tracecontext TraceContextTextMapPropagator

otel_propagator = TraceContextTextMapPropagator()
otel_tracer = trace.get_tracer(__name__)
Expand Down Expand Up @@ -283,7 +283,7 @@ class TaskHubGrpcWorker:
activity function.
"""

_response_stream: Optional[grpc.Future] = None
_response_stream: Optional[Iterator[Any]] = None
_interceptors: Optional[list[shared.ClientInterceptor]] = None

def __init__(
Expand Down Expand Up @@ -418,10 +418,10 @@ def create_fresh_connection():

def invalidate_connection():
nonlocal current_channel, current_stub, current_reader_thread
# Cancel the response stream first to signal the reader thread to stop
# Close the response stream first to signal the reader thread to stop
if self._response_stream is not None:
try:
self._response_stream.cancel()
self._response_stream.close()
except Exception:
pass
self._response_stream = None
Expand Down Expand Up @@ -740,7 +740,10 @@ def stop(self):

self._logger.info("Stopping gRPC worker...")
if self._response_stream is not None:
self._response_stream.cancel()
try:
self._response_stream.close()
except Exception as e:
self._logger.exception(f"Error stopping response stream: {e}")
self._shutdown.set()
# Explicitly close the gRPC channel to ensure OTel interceptors and other resources are cleaned up
if self._current_channel is not None:
Expand Down
Loading