Skip to content

[Feature]: concurrency performance improvement on nested waits/retries #325

@yaythomas

Description

@yaythomas

What would you like?

Optimize Empty Checkpoint Coalescing in Checkpoint Batcher

Problem

Checkpoint batching coalesces empty checkpoints only AFTER batching rather than BEFORE.

When many map/parallel iterations resume from wait operations simultaneously, each queues a separate empty checkpoint operation. The current implementation collects these into batches of up to 250 operations, then filters out the empty checkpoints before making the API call. This results in unnecessary batches and API calls.

This impacts on map/parallel with wait, retry, or wait_for_condition inside it:

  • High-concurrency scenarios (100+ concurrent branches/items)
  • Operations that suspend (wait, wait_for_condition, step retries)
  • Multiple resume cycles (polling, retries)

How It Happens in the Code

Step 1: Empty checkpoints are queued separately

When map iterations resume from wait, the resubmitter callback is invoked:

# In concurrency/executor.py
def resubmitter(executable_with_state: ExecutableWithState) -> None:
    execution_state.create_checkpoint()  # operation_update=None (empty checkpoint)
    submit_task(executable_with_state)

Each call to create_checkpoint() with no arguments queues a separate QueuedOperation:

# In state.py - create_checkpoint()
completion_event = CompletionEvent() if is_sync else None
queued_op = QueuedOperation(operation_update=None, completion_event)  # Separate object per call
self._checkpoint_queue.put(queued_op)  # Each queued individually

Step 2: Batcher collects up to 250 operations

The background thread collects operations into batches:

# In state.py - _collect_checkpoint_batch()
batch: list[QueuedOperation] = []
# ... collects up to max_batch_operations (250) ...
return batch  # Returns 250 QueuedOperation objects

Step 3: Empty checkpoints are filtered AFTER batching

Only when preparing the API call are empty checkpoints filtered:

# In state.py - checkpoint_batches_forever()
batch = self._collect_checkpoint_batch()  # 250 operations collected

# Filter happens HERE - after batching
updates = [q.operation_update for q in batch if q.operation_update is not None]

# API call with filtered updates
output = self._service_client.checkpoint(
    updates=updates,  # Empty list [] if all were empty checkpoints
    ...
)

The Inefficiency

For 999 empty checkpoints:

  1. 999 separate QueuedOperation objects queued
  2. Batcher collects them in 4 batches: 250 + 250 + 250 + 249
  3. Each batch filters empties: updates = []
  4. 4 API calls made with updates=[]
  5. Each batch waits 1 second for batching window

Result: 4 API calls, ~4 seconds processing time

Optimal: 1 API call, ~1.2 seconds processing time

Impact

This affects high-concurrency map/parallel operations with wait operations:

  • API overhead: 3-4x more API calls than necessary
  • Memory usage: 999 queue objects instead of 1 marker + 999 events
  • Processing time: 3x slower batch processing
  • Latency: Items resume serially through batches instead of simultaneously

Possible Implementation

No response

Is this a breaking change?

No

Does this require an RFC?

No

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions