diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6dc1ddb65..4c07251ee 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -163,7 +163,7 @@ jobs: - run: poe build-develop - run: poe lint - run: mkdir junit-xml - - run: poe test -s --junit-xml=junit-xml/latest-deps.xml + - run: poe test -s --junit-xml=junit-xml/latest-deps.xml timeout-minutes: 15 - name: "Upload junit-xml artifacts" uses: actions/upload-artifact@v4 diff --git a/README.md b/README.md index f1e995ea6..c65216160 100644 --- a/README.md +++ b/README.md @@ -2066,6 +2066,13 @@ To execute tests: poe test ``` +`poe test` spreads tests across multiple worker processes by default. If you +need a serial run for debugging, invoke pytest directly: + +```bash +uv run pytest +``` + This runs against [Temporalite](https://github.com/temporalio/temporalite). To run against the time-skipping test server, pass `--workflow-environment time-skipping`. To run against the `default` namespace of an already-running server, pass the `host:port` to `--workflow-environment`. Can also use regular pytest arguments. For example, here's how diff --git a/pyproject.toml b/pyproject.toml index e52ad5ead..b7d7b6c93 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,6 +75,7 @@ dev = [ "openinference-instrumentation-google-adk>=0.1.8", "googleapis-common-protos==1.70.0", "pytest-rerunfailures>=16.1", + "pytest-xdist>=3.6,<4", "moto[s3,server]>=5", "opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2", "opentelemetry-semantic-conventions>=0.40b0,<1", @@ -118,7 +119,7 @@ lint-types = [ { cmd = "uv run basedpyright" }, ] run-bench = "uv run python scripts/run_bench.py" -test = "uv run pytest" +test = "uv run pytest -n auto --dist=worksteal" [tool.pytest.ini_options] diff --git a/tests/conftest.py b/tests/conftest.py index e2ab2149e..303af2e3b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -187,9 +187,16 @@ async def worker( @pytest.hookimpl(hookwrapper=True, trylast=True) def pytest_cmdline_main(config): # type: ignore[reportMissingParameterType, reportUnusedParameter] result = yield - if result.get_result() == 0: + exit_code = result.get_result() + numprocesses = getattr(config.option, "numprocesses", None) + running_with_xdist = hasattr(config, "workerinput") or numprocesses not in ( + None, + 0, + "0", + ) + if exit_code == 0 and not running_with_xdist: os._exit(0) - return result.get_result() + return exit_code CONTINUE_AS_NEW_SUGGEST_HISTORY_COUNT = 50 diff --git a/tests/nexus/test_workflow_caller.py b/tests/nexus/test_workflow_caller.py index 6e69039ad..ca9e2e145 100644 --- a/tests/nexus/test_workflow_caller.py +++ b/tests/nexus/test_workflow_caller.py @@ -1101,10 +1101,13 @@ async def test_async_response( return handler_wf_info = await handler_wf_handle.describe() - assert handler_wf_info.status in [ + expected_statuses = [ WorkflowExecutionStatus.RUNNING, WorkflowExecutionStatus.COMPLETED, ] + if request_cancel: + expected_statuses.append(WorkflowExecutionStatus.CANCELED) + assert handler_wf_info.status in expected_statuses await assert_handler_workflow_has_link_to_caller_workflow( caller_wf_handle, handler_wf_handle ) @@ -1508,6 +1511,9 @@ async def test_workflow_run_operation_can_execute_workflow_before_starting_backi client: Client, env: WorkflowEnvironment, ): + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work with time-skipping server") + task_queue = str(uuid.uuid4()) async with Worker( client, diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index 99efdf30f..df85b89fb 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -639,7 +639,7 @@ async def test_sync_activity_process_cancel( picklable_activity_wait_cancel, cancel_after_ms=100, wait_for_cancellation=True, - heartbeat_timeout_ms=3000, + heartbeat_timeout_ms=5000, worker_config={"activity_executor": executor}, shared_state_manager=shared_state_manager, ) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index f123e5c61..428ea3456 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -331,10 +331,14 @@ async def test_workflow_history_info( # because just a query will have a stale representation of history # counts, but signal forces a new WFT. await handle.signal(HistoryInfoWorkflow.bunch_of_events, 1) - new_info = await handle.query(HistoryInfoWorkflow.get_history_info) - assert new_info.history_length > continue_as_new_suggest_history_count - assert new_info.history_size > orig_info.history_size - assert new_info.continue_as_new_suggested + + async def history_info_updated() -> None: + new_info = await handle.query(HistoryInfoWorkflow.get_history_info) + assert new_info.history_length > continue_as_new_suggest_history_count + assert new_info.history_size > orig_info.history_size + assert new_info.continue_as_new_suggested + + await assert_eventually(history_info_updated) @workflow.defn @@ -5317,7 +5321,11 @@ async def any_task_completed(handle: WorkflowHandle) -> bool: # because we should have timer-done poll completions every 100ms worker.client = other_env.client # Now confirm the other workflow has started - await assert_eq_eventually(True, lambda: any_task_completed(handle2)) + await assert_eq_eventually( + True, + lambda: any_task_completed(handle2), + timeout=timedelta(seconds=30), + ) # Terminate both await handle1.terminate() await handle2.terminate() @@ -8047,8 +8055,10 @@ async def test_quick_activity_swallows_cancellation(client: Client): activities=[short_activity_async], activity_executor=concurrent.futures.ThreadPoolExecutor(max_workers=1), ) as worker: - for i in range(10): - wf_duration = random.uniform(5.0, 15.0) + # Keep this deterministic and bounded. The original randomized 10-iteration + # version could exceed the per-test timeout on slower CI hosts if + # cancellation was delayed a few times in a row. + for i, wf_duration in enumerate((5.0, 7.5, 10.0)): wf_handle = await client.start_workflow( QuickActivityWorkflow.run, id=f"short_activity_wf_id-{i}", @@ -8537,23 +8547,13 @@ def emit(self, record: logging.LogRecord) -> None: async def test_disable_logger_sandbox( client: Client, ): - logger = workflow.logger.logger - handler = CustomLogHandler() - with LogHandler.apply(logger, handler): + async def execute_with_new_worker(*, disable_sandbox: bool) -> None: + workflow.logger.unsafe_disable_sandbox(disable_sandbox) async with new_worker( client, DisableLoggerSandbox, activities=[], ) as worker: - with pytest.raises(WorkflowFailureError): - await client.execute_workflow( - DisableLoggerSandbox.run, - id=f"workflow-{uuid.uuid4()}", - task_queue=worker.task_queue, - run_timeout=timedelta(seconds=1), - retry_policy=RetryPolicy(maximum_attempts=1), - ) - workflow.logger.unsafe_disable_sandbox() await client.execute_workflow( DisableLoggerSandbox.run, id=f"workflow-{uuid.uuid4()}", @@ -8561,15 +8561,15 @@ async def test_disable_logger_sandbox( run_timeout=timedelta(seconds=1), retry_policy=RetryPolicy(maximum_attempts=1), ) - workflow.logger.unsafe_disable_sandbox(False) - with pytest.raises(WorkflowFailureError): - await client.execute_workflow( - DisableLoggerSandbox.run, - id=f"workflow-{uuid.uuid4()}", - task_queue=worker.task_queue, - run_timeout=timedelta(seconds=1), - retry_policy=RetryPolicy(maximum_attempts=1), - ) + + logger = workflow.logger.logger + handler = CustomLogHandler() + with LogHandler.apply(logger, handler): + with pytest.raises(WorkflowFailureError): + await execute_with_new_worker(disable_sandbox=False) + await execute_with_new_worker(disable_sandbox=True) + with pytest.raises(WorkflowFailureError): + await execute_with_new_worker(disable_sandbox=False) @workflow.defn diff --git a/uv.lock b/uv.lock index c0fe6ec17..08ce15aad 100644 --- a/uv.lock +++ b/uv.lock @@ -945,6 +945,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/36/f4/c6e662dade71f56cd2f3735141b265c3c79293c109549c1e6933b0651ffc/exceptiongroup-1.3.0-py3-none-any.whl", hash = "sha256:4d111e6e0c13d0644cad6ddaa7ed0261a0b36971f6d23e7ec9b4b9097da78a10", size = 16674, upload-time = "2025-05-10T17:42:49.33Z" }, ] +[[package]] +name = "execnet" +version = "2.1.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/bf/89/780e11f9588d9e7128a3f87788354c7946a9cbb1401ad38a48c4db9a4f07/execnet-2.1.2.tar.gz", hash = "sha256:63d83bfdd9a23e35b9c6a3261412324f964c2ec8dcd8d3c6916ee9373e0befcd", size = 166622, upload-time = "2025-11-12T09:56:37.75Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ab/84/02fc1827e8cdded4aa65baef11296a9bbe595c474f0d6d758af082d849fd/execnet-2.1.2-py3-none-any.whl", hash = "sha256:67fba928dd5a544b783f6056f449e5e3931a5c378b128bc18501f7ea79e296ec", size = 40708, upload-time = "2025-11-12T09:56:36.333Z" }, +] + [[package]] name = "fastapi" version = "0.135.1" @@ -4120,6 +4129,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/fa/b6/3127540ecdf1464a00e5a01ee60a1b09175f6913f0644ac748494d9c4b21/pytest_timeout-2.4.0-py3-none-any.whl", hash = "sha256:c42667e5cdadb151aeb5b26d114aff6bdf5a907f176a007a30b940d3d865b5c2", size = 14382, upload-time = "2025-05-05T19:44:33.502Z" }, ] +[[package]] +name = "pytest-xdist" +version = "3.8.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "execnet" }, + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/78/b4/439b179d1ff526791eb921115fca8e44e596a13efeda518b9d845a619450/pytest_xdist-3.8.0.tar.gz", hash = "sha256:7e578125ec9bc6050861aa93f2d59f1d8d085595d6551c2c90b6f4fad8d3a9f1", size = 88069, upload-time = "2025-07-01T13:30:59.346Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ca/31/d4e37e9e550c2b92a9cbc2e4d0b7420a27224968580b5a447f420847c975/pytest_xdist-3.8.0-py3-none-any.whl", hash = "sha256:202ca578cfeb7370784a8c33d6d05bc6e13b4f25b5053c30a152269fd10f0b88", size = 46396, upload-time = "2025-07-01T13:30:56.632Z" }, +] + [[package]] name = "python-dateutil" version = "2.9.0.post0" @@ -4878,6 +4900,7 @@ dev = [ { name = "pytest-pretty" }, { name = "pytest-rerunfailures" }, { name = "pytest-timeout" }, + { name = "pytest-xdist" }, { name = "ruff" }, { name = "toml" }, { name = "twine" }, @@ -4935,6 +4958,7 @@ dev = [ { name = "pytest-pretty", specifier = ">=1.3.0" }, { name = "pytest-rerunfailures", specifier = ">=16.1" }, { name = "pytest-timeout", specifier = "~=2.2" }, + { name = "pytest-xdist", specifier = ">=3.6,<4" }, { name = "ruff", specifier = ">=0.5.0,<0.6" }, { name = "toml", specifier = ">=0.10.2,<0.11" }, { name = "twine", specifier = ">=4.0.1,<5" },