Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ jobs:
- run: poe build-develop
- run: poe lint
- run: mkdir junit-xml
- run: poe test -s --junit-xml=junit-xml/latest-deps.xml
- run: poe test -s --junit-xml=junit-xml/latest-deps.xml
timeout-minutes: 15
- name: "Upload junit-xml artifacts"
uses: actions/upload-artifact@v4
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2066,6 +2066,13 @@ To execute tests:
poe test
```

`poe test` spreads tests across multiple worker processes by default. If you
need a serial run for debugging, invoke pytest directly:

```bash
uv run pytest
```

This runs against [Temporalite](https://github.com/temporalio/temporalite). To run against the time-skipping test
server, pass `--workflow-environment time-skipping`. To run against the `default` namespace of an already-running
server, pass the `host:port` to `--workflow-environment`. Can also use regular pytest arguments. For example, here's how
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ dev = [
"openinference-instrumentation-google-adk>=0.1.8",
"googleapis-common-protos==1.70.0",
"pytest-rerunfailures>=16.1",
"pytest-xdist>=3.6,<4",
"moto[s3,server]>=5",
"opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2",
"opentelemetry-semantic-conventions>=0.40b0,<1",
Expand Down Expand Up @@ -118,7 +119,7 @@ lint-types = [
{ cmd = "uv run basedpyright" },
]
run-bench = "uv run python scripts/run_bench.py"
test = "uv run pytest"
test = "uv run pytest -n auto --dist=worksteal"


[tool.pytest.ini_options]
Expand Down
11 changes: 9 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,16 @@ async def worker(
@pytest.hookimpl(hookwrapper=True, trylast=True)
def pytest_cmdline_main(config): # type: ignore[reportMissingParameterType, reportUnusedParameter]
result = yield
if result.get_result() == 0:
exit_code = result.get_result()
numprocesses = getattr(config.option, "numprocesses", None)
running_with_xdist = hasattr(config, "workerinput") or numprocesses not in (
None,
0,
"0",
)
if exit_code == 0 and not running_with_xdist:
os._exit(0)
return result.get_result()
return exit_code


CONTINUE_AS_NEW_SUGGEST_HISTORY_COUNT = 50
Expand Down
8 changes: 7 additions & 1 deletion tests/nexus/test_workflow_caller.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,10 +1101,13 @@ async def test_async_response(
return

handler_wf_info = await handler_wf_handle.describe()
assert handler_wf_info.status in [
expected_statuses = [
WorkflowExecutionStatus.RUNNING,
WorkflowExecutionStatus.COMPLETED,
]
if request_cancel:
expected_statuses.append(WorkflowExecutionStatus.CANCELED)
assert handler_wf_info.status in expected_statuses
await assert_handler_workflow_has_link_to_caller_workflow(
caller_wf_handle, handler_wf_handle
)
Expand Down Expand Up @@ -1508,6 +1511,9 @@ async def test_workflow_run_operation_can_execute_workflow_before_starting_backi
client: Client,
env: WorkflowEnvironment,
):
if env.supports_time_skipping:
pytest.skip("Nexus tests don't work with time-skipping server")

task_queue = str(uuid.uuid4())
async with Worker(
client,
Expand Down
2 changes: 1 addition & 1 deletion tests/worker/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ async def test_sync_activity_process_cancel(
picklable_activity_wait_cancel,
cancel_after_ms=100,
wait_for_cancellation=True,
heartbeat_timeout_ms=3000,
heartbeat_timeout_ms=5000,
worker_config={"activity_executor": executor},
shared_state_manager=shared_state_manager,
)
Expand Down
56 changes: 28 additions & 28 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,14 @@ async def test_workflow_history_info(
# because just a query will have a stale representation of history
# counts, but signal forces a new WFT.
await handle.signal(HistoryInfoWorkflow.bunch_of_events, 1)
new_info = await handle.query(HistoryInfoWorkflow.get_history_info)
assert new_info.history_length > continue_as_new_suggest_history_count
assert new_info.history_size > orig_info.history_size
assert new_info.continue_as_new_suggested

async def history_info_updated() -> None:
new_info = await handle.query(HistoryInfoWorkflow.get_history_info)
assert new_info.history_length > continue_as_new_suggest_history_count
assert new_info.history_size > orig_info.history_size
assert new_info.continue_as_new_suggested

await assert_eventually(history_info_updated)


@workflow.defn
Expand Down Expand Up @@ -5317,7 +5321,11 @@ async def any_task_completed(handle: WorkflowHandle) -> bool:
# because we should have timer-done poll completions every 100ms
worker.client = other_env.client
# Now confirm the other workflow has started
await assert_eq_eventually(True, lambda: any_task_completed(handle2))
await assert_eq_eventually(
True,
lambda: any_task_completed(handle2),
timeout=timedelta(seconds=30),
)
# Terminate both
await handle1.terminate()
await handle2.terminate()
Expand Down Expand Up @@ -8047,8 +8055,10 @@ async def test_quick_activity_swallows_cancellation(client: Client):
activities=[short_activity_async],
activity_executor=concurrent.futures.ThreadPoolExecutor(max_workers=1),
) as worker:
for i in range(10):
wf_duration = random.uniform(5.0, 15.0)
# Keep this deterministic and bounded. The original randomized 10-iteration
# version could exceed the per-test timeout on slower CI hosts if
# cancellation was delayed a few times in a row.
for i, wf_duration in enumerate((5.0, 7.5, 10.0)):
wf_handle = await client.start_workflow(
QuickActivityWorkflow.run,
id=f"short_activity_wf_id-{i}",
Expand Down Expand Up @@ -8537,39 +8547,29 @@ def emit(self, record: logging.LogRecord) -> None:
async def test_disable_logger_sandbox(
client: Client,
):
logger = workflow.logger.logger
handler = CustomLogHandler()
with LogHandler.apply(logger, handler):
async def execute_with_new_worker(*, disable_sandbox: bool) -> None:
workflow.logger.unsafe_disable_sandbox(disable_sandbox)
async with new_worker(
client,
DisableLoggerSandbox,
activities=[],
) as worker:
with pytest.raises(WorkflowFailureError):
await client.execute_workflow(
DisableLoggerSandbox.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
run_timeout=timedelta(seconds=1),
retry_policy=RetryPolicy(maximum_attempts=1),
)
workflow.logger.unsafe_disable_sandbox()
await client.execute_workflow(
DisableLoggerSandbox.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
run_timeout=timedelta(seconds=1),
retry_policy=RetryPolicy(maximum_attempts=1),
)
workflow.logger.unsafe_disable_sandbox(False)
with pytest.raises(WorkflowFailureError):
await client.execute_workflow(
DisableLoggerSandbox.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
run_timeout=timedelta(seconds=1),
retry_policy=RetryPolicy(maximum_attempts=1),
)

logger = workflow.logger.logger
handler = CustomLogHandler()
with LogHandler.apply(logger, handler):
with pytest.raises(WorkflowFailureError):
await execute_with_new_worker(disable_sandbox=False)
await execute_with_new_worker(disable_sandbox=True)
with pytest.raises(WorkflowFailureError):
await execute_with_new_worker(disable_sandbox=False)


@workflow.defn
Expand Down
24 changes: 24 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading