diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 71a5caa57..9d76a5150 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 71a5caa57118848bd60843dd7fa867ed73704108 +Subproject commit 9d76a51506fa2716b716ade01b6b2dd72e7f8099 diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index 4820fd843..d37226614 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -684,6 +684,7 @@ impl WorkerRef { } fn initiate_shutdown(&self) -> PyResult<()> { + enter_sync!(self.runtime); let worker = self.worker.as_ref().unwrap().clone(); worker.initiate_shutdown(); Ok(()) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 332e2ead7..84e3a8ae0 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -781,6 +781,7 @@ async def _run(self): if self._started: raise RuntimeError("Already started") self._started = True + logger.error("SHUTDOWN-DEBUG: worker started (instrumented build)") # Create a task that raises when a shutdown is requested async def raise_on_shutdown(): @@ -837,7 +838,9 @@ async def raise_on_shutdown(): ) # Initiate core worker shutdown + logger.error("SHUTDOWN-DEBUG: initiate_shutdown called") self._bridge_worker.initiate_shutdown() + logger.error("SHUTDOWN-DEBUG: initiate_shutdown returned") # If any worker task had an exception, replace that task with a queue drain for worker, task in tasks.items(): @@ -853,7 +856,9 @@ async def raise_on_shutdown(): self._nexus_worker.notify_shutdown() # Wait for all tasks to complete (i.e. for poller loops to stop) + logger.error("SHUTDOWN-DEBUG: awaiting poller loops") await asyncio.wait(tasks.values()) + logger.error("SHUTDOWN-DEBUG: poller loops stopped") # Sometimes both workers throw an exception and since we only take the # first, Python may complain with "Task exception was never retrieved" # if we don't get the others. Therefore we call cancel on each task @@ -871,7 +876,9 @@ async def raise_on_shutdown(): # Do final shutdown try: + logger.error("SHUTDOWN-DEBUG: finalize_shutdown called") await self._bridge_worker.finalize_shutdown() + logger.error("SHUTDOWN-DEBUG: finalize_shutdown returned") except: # Ignore errors here that can arise in some tests where the bridge # worker still has a reference diff --git a/tests/__init__.py b/tests/__init__.py index 5af71def3..957ad45e5 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1 +1 @@ -DEV_SERVER_DOWNLOAD_VERSION = "v1.6.1-server-1.31.0-151.0" +DEV_SERVER_DOWNLOAD_VERSION = "v1.6.2-server-1.31.0-151.6" diff --git a/tests/conftest.py b/tests/conftest.py index c813f91f9..92a1455db 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -124,6 +124,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]: "history.enableChasm=true", "--dynamic-config-value", "history.enableTransitionHistory=true", + "--dynamic-config-value", + "frontend.enableCancelWorkerPollsOnShutdown=true", ], dev_server_download_version=DEV_SERVER_DOWNLOAD_VERSION, )