Skip to content
Draft
1 change: 1 addition & 0 deletions temporalio/bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ impl WorkerRef {
}

fn initiate_shutdown(&self) -> PyResult<()> {
enter_sync!(self.runtime);
let worker = self.worker.as_ref().unwrap().clone();
worker.initiate_shutdown();
Ok(())
Expand Down
7 changes: 7 additions & 0 deletions temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,7 @@ async def _run(self):
if self._started:
raise RuntimeError("Already started")
self._started = True
logger.error("SHUTDOWN-DEBUG: worker started (instrumented build)")

# Create a task that raises when a shutdown is requested
async def raise_on_shutdown():
Expand Down Expand Up @@ -837,7 +838,9 @@ async def raise_on_shutdown():
)

# Initiate core worker shutdown
logger.error("SHUTDOWN-DEBUG: initiate_shutdown called")
self._bridge_worker.initiate_shutdown()
logger.error("SHUTDOWN-DEBUG: initiate_shutdown returned")

# If any worker task had an exception, replace that task with a queue drain
for worker, task in tasks.items():
Expand All @@ -853,7 +856,9 @@ async def raise_on_shutdown():
self._nexus_worker.notify_shutdown()

# Wait for all tasks to complete (i.e. for poller loops to stop)
logger.error("SHUTDOWN-DEBUG: awaiting poller loops")
await asyncio.wait(tasks.values())
logger.error("SHUTDOWN-DEBUG: poller loops stopped")
# Sometimes both workers throw an exception and since we only take the
# first, Python may complain with "Task exception was never retrieved"
# if we don't get the others. Therefore we call cancel on each task
Expand All @@ -871,7 +876,9 @@ async def raise_on_shutdown():

# Do final shutdown
try:
logger.error("SHUTDOWN-DEBUG: finalize_shutdown called")
await self._bridge_worker.finalize_shutdown()
logger.error("SHUTDOWN-DEBUG: finalize_shutdown returned")
except:
# Ignore errors here that can arise in some tests where the bridge
# worker still has a reference
Expand Down
2 changes: 1 addition & 1 deletion tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DEV_SERVER_DOWNLOAD_VERSION = "v1.6.1-server-1.31.0-151.0"
DEV_SERVER_DOWNLOAD_VERSION = "v1.6.2-server-1.31.0-151.6"
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]:
"history.enableChasm=true",
"--dynamic-config-value",
"history.enableTransitionHistory=true",
"--dynamic-config-value",
"frontend.enableCancelWorkerPollsOnShutdown=true",
],
dev_server_download_version=DEV_SERVER_DOWNLOAD_VERSION,
)
Expand Down
Loading