From 2550b9fe0d657d344c942f2cd8a2a8cc958efb10 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Fri, 3 Apr 2026 13:53:50 -0700 Subject: [PATCH 01/13] Enable SDK flag, pick up initiate_shutdown changes --- temporalio/bridge/src/worker.rs | 8 +++++--- temporalio/bridge/worker.py | 4 ++-- temporalio/worker/_replayer.py | 2 +- temporalio/worker/_worker.py | 2 +- temporalio/worker/_workflow.py | 2 +- tests/__init__.py | 2 +- tests/conftest.py | 2 ++ 7 files changed, 13 insertions(+), 9 deletions(-) diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index 4820fd843..ae8a0d85d 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -683,10 +683,12 @@ impl WorkerRef { .map_err(|err| PyValueError::new_err(format!("Failed replacing client: {err}"))) } - fn initiate_shutdown(&self) -> PyResult<()> { + fn initiate_shutdown<'p>(&self, py: Python<'p>) -> PyResult> { let worker = self.worker.as_ref().unwrap().clone(); - worker.initiate_shutdown(); - Ok(()) + self.runtime.future_into_py(py, async move { + worker.initiate_shutdown().await; + Ok(()) + }) } fn finalize_shutdown<'p>(&mut self, py: Python<'p>) -> PyResult> { diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index a9c857373..f928f0b47 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -265,9 +265,9 @@ def replace_client(self, client: temporalio.bridge.client.Client) -> None: """Replace the worker client.""" self._ref.replace_client(client._ref) # type: ignore[reportOptionalMemberAccess] - def initiate_shutdown(self) -> None: + async def initiate_shutdown(self) -> None: """Start shutdown of the worker.""" - self._ref.initiate_shutdown() # type: ignore[reportOptionalMemberAccess] + await self._ref.initiate_shutdown() # type: ignore[reportOptionalMemberAccess] async def finalize_shutdown(self) -> None: """Finalize the worker. diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 508d5f708..1ba678640 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -390,7 +390,7 @@ async def replay_iterator() -> AsyncIterator[WorkflowReplayResult]: # We must shutdown here try: if bridge_worker_scope is not None: - bridge_worker_scope.initiate_shutdown() + await bridge_worker_scope.initiate_shutdown() await bridge_worker_scope.finalize_shutdown() except Exception: logger.warning("Failed to finalize shutdown", exc_info=True) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 332e2ead7..5c8006c0c 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -837,7 +837,7 @@ async def raise_on_shutdown(): ) # Initiate core worker shutdown - self._bridge_worker.initiate_shutdown() + await self._bridge_worker.initiate_shutdown() # If any worker task had an exception, replace that task with a queue drain for worker, task in tasks.items(): diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index b699e421d..31bb14c35 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -638,7 +638,7 @@ async def _handle_cache_eviction( except Exception as e: self._throw_after_activation = e logger.debug("Shutting down worker on eviction hook exception") - self._bridge_worker().initiate_shutdown() + await self._bridge_worker().initiate_shutdown() def _create_workflow_instance( self, diff --git a/tests/__init__.py b/tests/__init__.py index 5af71def3..957ad45e5 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1 +1 @@ -DEV_SERVER_DOWNLOAD_VERSION = "v1.6.1-server-1.31.0-151.0" +DEV_SERVER_DOWNLOAD_VERSION = "v1.6.2-server-1.31.0-151.6" diff --git a/tests/conftest.py b/tests/conftest.py index c813f91f9..92a1455db 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -124,6 +124,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]: "history.enableChasm=true", "--dynamic-config-value", "history.enableTransitionHistory=true", + "--dynamic-config-value", + "frontend.enableCancelWorkerPollsOnShutdown=true", ], dev_server_download_version=DEV_SERVER_DOWNLOAD_VERSION, ) From 0e228661455e03b47cc818c85706d8ea6b43776c Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 6 Apr 2026 09:14:04 -0700 Subject: [PATCH 02/13] Point to custom core commit --- temporalio/bridge/sdk-core | 2 +- temporalio/bridge/src/worker.rs | 9 ++++----- temporalio/bridge/worker.py | 4 ++-- temporalio/worker/_replayer.py | 2 +- temporalio/worker/_worker.py | 2 +- temporalio/worker/_workflow.py | 2 +- 6 files changed, 10 insertions(+), 11 deletions(-) diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 71a5caa57..434617f8c 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 71a5caa57118848bd60843dd7fa867ed73704108 +Subproject commit 434617f8cdc4de41cba292b4a228810374689a8b diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index ae8a0d85d..d37226614 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -683,12 +683,11 @@ impl WorkerRef { .map_err(|err| PyValueError::new_err(format!("Failed replacing client: {err}"))) } - fn initiate_shutdown<'p>(&self, py: Python<'p>) -> PyResult> { + fn initiate_shutdown(&self) -> PyResult<()> { + enter_sync!(self.runtime); let worker = self.worker.as_ref().unwrap().clone(); - self.runtime.future_into_py(py, async move { - worker.initiate_shutdown().await; - Ok(()) - }) + worker.initiate_shutdown(); + Ok(()) } fn finalize_shutdown<'p>(&mut self, py: Python<'p>) -> PyResult> { diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index f928f0b47..a9c857373 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -265,9 +265,9 @@ def replace_client(self, client: temporalio.bridge.client.Client) -> None: """Replace the worker client.""" self._ref.replace_client(client._ref) # type: ignore[reportOptionalMemberAccess] - async def initiate_shutdown(self) -> None: + def initiate_shutdown(self) -> None: """Start shutdown of the worker.""" - await self._ref.initiate_shutdown() # type: ignore[reportOptionalMemberAccess] + self._ref.initiate_shutdown() # type: ignore[reportOptionalMemberAccess] async def finalize_shutdown(self) -> None: """Finalize the worker. diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 1ba678640..508d5f708 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -390,7 +390,7 @@ async def replay_iterator() -> AsyncIterator[WorkflowReplayResult]: # We must shutdown here try: if bridge_worker_scope is not None: - await bridge_worker_scope.initiate_shutdown() + bridge_worker_scope.initiate_shutdown() await bridge_worker_scope.finalize_shutdown() except Exception: logger.warning("Failed to finalize shutdown", exc_info=True) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 5c8006c0c..332e2ead7 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -837,7 +837,7 @@ async def raise_on_shutdown(): ) # Initiate core worker shutdown - await self._bridge_worker.initiate_shutdown() + self._bridge_worker.initiate_shutdown() # If any worker task had an exception, replace that task with a queue drain for worker, task in tasks.items(): diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 31bb14c35..b699e421d 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -638,7 +638,7 @@ async def _handle_cache_eviction( except Exception as e: self._throw_after_activation = e logger.debug("Shutting down worker on eviction hook exception") - await self._bridge_worker().initiate_shutdown() + self._bridge_worker().initiate_shutdown() def _create_workflow_instance( self, From 7fb88afabaf2c3ae54d0d79e6cab20d2b31b7e6b Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 6 Apr 2026 22:27:19 -0700 Subject: [PATCH 03/13] point to f75a9aa0 --- temporalio/bridge/sdk-core | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 434617f8c..f75a9aa08 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 434617f8cdc4de41cba292b4a228810374689a8b +Subproject commit f75a9aa088cfa0d1c73aa30ce831acc7895efbab From e7340dd6233857121378d935193c1eb6752dce8b Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 7 Apr 2026 09:14:00 -0700 Subject: [PATCH 04/13] point to cd853f05 --- temporalio/bridge/sdk-core | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index f75a9aa08..cd853f052 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit f75a9aa088cfa0d1c73aa30ce831acc7895efbab +Subproject commit cd853f052b12f41ab10f07cabe577238e4992ab5 From 2eb7184b33fdd47fc208e4e9df9ed1959d8a9087 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 7 Apr 2026 14:26:56 -0700 Subject: [PATCH 05/13] Add Python-side shutdown timing logs for diagnosis Add print statements with timestamps at key shutdown points (initiate_shutdown, poller drain, finalize_shutdown) to correlate with TEMP_FIX poll cancellation timestamps and determine if the race is slow ShutdownWorker RPC or in-flight polls. Co-Authored-By: Claude Opus 4.6 --- temporalio/bridge/sdk-core | 2 +- temporalio/worker/_worker.py | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index cd853f052..0829c00f0 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit cd853f052b12f41ab10f07cabe577238e4992ab5 +Subproject commit 0829c00f000c34407d1781a94c881fa8dfa9f382 diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 332e2ead7..9b4bbbfc4 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -837,7 +837,9 @@ async def raise_on_shutdown(): ) # Initiate core worker shutdown + logger.error("SHUTDOWN-DEBUG: initiate_shutdown called") self._bridge_worker.initiate_shutdown() + logger.error("SHUTDOWN-DEBUG: initiate_shutdown returned") # If any worker task had an exception, replace that task with a queue drain for worker, task in tasks.items(): @@ -853,7 +855,9 @@ async def raise_on_shutdown(): self._nexus_worker.notify_shutdown() # Wait for all tasks to complete (i.e. for poller loops to stop) + logger.error("SHUTDOWN-DEBUG: awaiting poller loops") await asyncio.wait(tasks.values()) + logger.error("SHUTDOWN-DEBUG: poller loops stopped") # Sometimes both workers throw an exception and since we only take the # first, Python may complain with "Task exception was never retrieved" # if we don't get the others. Therefore we call cancel on each task @@ -871,7 +875,9 @@ async def raise_on_shutdown(): # Do final shutdown try: + logger.error("SHUTDOWN-DEBUG: finalize_shutdown called") await self._bridge_worker.finalize_shutdown() + logger.error("SHUTDOWN-DEBUG: finalize_shutdown returned") except: # Ignore errors here that can arise in some tests where the bridge # worker still has a reference From 87b4e08e8c787737bcdf50e334ff125e0cddc243 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 7 Apr 2026 15:22:00 -0700 Subject: [PATCH 06/13] retrigger CI From 066b7bb8a976fccd22fe4fee0bf9460aa69fc4e6 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 7 Apr 2026 17:06:47 -0700 Subject: [PATCH 07/13] Add canary log to confirm instrumented build is running Co-Authored-By: Claude Opus 4.6 --- temporalio/worker/_worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 9b4bbbfc4..84e3a8ae0 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -781,6 +781,7 @@ async def _run(self): if self._started: raise RuntimeError("Already started") self._started = True + logger.error("SHUTDOWN-DEBUG: worker started (instrumented build)") # Create a task that raises when a shutdown is requested async def raise_on_shutdown(): From e78eb39539e5251dec04dee9a02902cd04e7424e Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 7 Apr 2026 23:08:14 -0700 Subject: [PATCH 08/13] retrigger CI attempt 1 From b11baa0272bc254c2561ed3a59053b399e77d001 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 7 Apr 2026 23:08:17 -0700 Subject: [PATCH 09/13] retrigger CI attempt 2 From 438c07eb735b80472b14fe9199ea72ca70138fd0 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 7 Apr 2026 23:08:21 -0700 Subject: [PATCH 10/13] retrigger CI attempt 3 From e7c993c6586afcd7077c587490c4e330f84fd106 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 7 Apr 2026 23:08:25 -0700 Subject: [PATCH 11/13] retrigger CI attempt 4 From 48ed8eb6002174a91659f3a0fdd4d03b1905ce09 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 7 Apr 2026 23:08:28 -0700 Subject: [PATCH 12/13] retrigger CI attempt 5 From 700f9bee965f3865fc00a183a46d1a3329e88d17 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Wed, 8 Apr 2026 00:17:48 -0700 Subject: [PATCH 13/13] Update sdk-core: reduce TEMP_FIX grace to 500ms for repro Co-Authored-By: Claude Opus 4.6 --- temporalio/bridge/sdk-core | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 0829c00f0..9d76a5150 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 0829c00f000c34407d1781a94c881fa8dfa9f382 +Subproject commit 9d76a51506fa2716b716ade01b6b2dd72e7f8099