fix: preserve push notification event order per task (closes #775)#776
fix: preserve push notification event order per task (closes #775)#776perhaps468 wants to merge 1 commit intoa2aproject:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request replaces the global asynchronous push notification system with a per-task bounded queue and dedicated consumer threads to ensure FIFO ordering and isolation. The review feedback highlights a potential blocking issue in the main event loop during notifier cleanup, a race condition in the queue overflow logic when accessed by multiple threads, and the need for a more robust test case to properly verify the "drop oldest" behavior.
| notifiers.compute(taskId, (id, existing) -> { | ||
| if (existing != null && !existing.isExpired()) { | ||
| existing.touch(); | ||
| return existing; | ||
| } | ||
| if (existing != null) existing.shutdown(); | ||
| return new TaskPushNotifier(taskId); | ||
| }).offer(event); |
There was a problem hiding this comment.
The shutdown() method on an expired TaskPushNotifier is called synchronously within the notifiers.compute() block. This block is executed by the single MainEventBusProcessor thread, which is a critical component for processing all events. The shutdown() method contains a consumerThread.join(2000), which can block for up to 2 seconds if the consumer thread is stuck (e.g., in a non-interruptible network call within pushSender). This could severely degrade or stall the main event processing loop.
Consider making the shutdown of expired notifiers asynchronous to avoid blocking the MainEventBusProcessor thread. You could use a dedicated executor or simply a new thread for this cleanup task.
notifiers.compute(taskId, (id, existing) -> {
if (existing != null && !existing.isExpired()) {
existing.touch();
return existing;
}
if (existing != null) {
new Thread(existing::shutdown, "Notifier-Shutdown-" + existing.taskId).start();
}
return new TaskPushNotifier(taskId);
}).offer(event);| void offer(StreamingEventKind event) { | ||
| if (shutdown) return; | ||
| touch(); | ||
| while (!queue.offer(event)) { | ||
| // Queue is full - drop oldest to make room for newest | ||
| queue.poll(); | ||
| } | ||
| } |
There was a problem hiding this comment.
The offer method can be called by multiple producer threads concurrently for the same task. The current implementation of the while loop to handle a full queue is not atomic. This can lead to a race condition where more than one "oldest" event is dropped to make room for new events if multiple threads attempt to offer to a full queue at the same time. To ensure that only one event is dropped per new event added, you should synchronize this method.
| void offer(StreamingEventKind event) { | |
| if (shutdown) return; | |
| touch(); | |
| while (!queue.offer(event)) { | |
| // Queue is full - drop oldest to make room for newest | |
| queue.poll(); | |
| } | |
| } | |
| synchronized void offer(StreamingEventKind event) { | |
| if (shutdown) return; | |
| touch(); | |
| while (!queue.offer(event)) { | |
| // Queue is full - drop oldest to make room for newest | |
| queue.poll(); | |
| } | |
| } |
References
- Using AtomicBoolean or synchronization is a general recommendation for managing shared mutable state across multiple threads to ensure atomicity.
| @Test | ||
| public void testQueueOverflowDropsOldest() throws Exception { | ||
| // Test that the per-task queue has a bounded capacity. | ||
| String taskId = "overflow-task"; | ||
| int capacity = 50; | ||
|
|
||
| AtomicInteger sentCount = new AtomicInteger(0); | ||
|
|
||
| PushNotificationSender countingSender = event -> { | ||
| sentCount.incrementAndGet(); | ||
| pushSender.sendNotification(event); | ||
| }; | ||
|
|
||
| MainEventBusProcessor processor = new MainEventBusProcessor( | ||
| mainEventBus, mock(TaskStore.class), countingSender, queueManager); | ||
|
|
||
| // Submit exactly the queue capacity worth of events | ||
| for (int i = 0; i < capacity; i++) { | ||
| Task event = createTask(taskId, i); | ||
| processor.sendPushNotification(taskId, event); | ||
| } | ||
|
|
||
| // Wait for all to be consumed | ||
| Thread.sleep(1000); | ||
|
|
||
| // All capacity events should be sent (no overflow) | ||
| assertEquals(capacity, sentCount.get(), | ||
| "All " + capacity + " events should be sent when queue is exactly full"); | ||
|
|
||
| processor.stop(); | ||
| } |
There was a problem hiding this comment.
The test testQueueOverflowDropsOldest doesn't seem to test the overflow condition as its name implies. It submits exactly capacity items and asserts that all are processed. This confirms the queue can hold capacity items, but it doesn't verify the "drop oldest" behavior on overflow.
A more robust test would:
- Submit
capacity + Nevents. - Ensure the consumer has a chance to run but doesn't empty the queue completely (or is blocked).
- Verify that only the latest
capacityevents are in the queue (or were processed), and that the firstNevents were dropped.
For example, you could submit capacity + 1 events and check that the first event was dropped and the last one was processed.
References
- While stricter coding practices can be relaxed for test code, the test logic must still accurately verify the intended behavior and edge cases.
Summary
Push notifications for streaming events (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent) could arrive out of order on the client side because the previous implementation used
CompletableFuture.runAsync()— which submits to the sharedForkJoinPool.common()pool, where events from concurrent tasks compete for the same thread pool and can be interleaved unpredictably.This fix replaces the shared-pool approach with per-task bounded queues, each with a dedicated consumer thread, guaranteeing FIFO ordering per task and complete isolation between tasks.
Problem
In the original implementation (
MainEventBusProcessor.sendPushNotification()):ForkJoinPool.common()has a fixed parallelism ofRuntime.getRuntime().availableProcessors()(typically 8–16). When multiple concurrent tasks each produce multiple events, those events race for thread pool slots. The consumer can receive task A's Event 2 before Event 1, purely as a scheduling artifact.Options Considered
Analysis of the Original Approach
The original implementation using
CompletableFuture.runAsync()againstForkJoinPool.common()had clear advantages worth acknowledging:Original pros:
Original cons:
Is this optimization necessary? For a protocol like A2A where clients consume streams of events (text chunks arriving in order, artifacts building up incrementally), out-of-order delivery is a correctness bug, not just a performance issue. A client expecting
working → partial-completed → completedcould seepartial-completed → working → completed, breaking expectations of any state-machine logic in the client. This is particularly critical for:Given the correctness implications, the per-task isolation is worth the added complexity.
Option: Single-threaded executor per task (chosen)
Each task gets its own
LinkedBlockingQueue+ dedicated daemon thread.Pros:
Cons:
Solution
Each task gets a
TaskPushNotifier(inner class ofMainEventBusProcessor) containing:LinkedBlockingQueue<StreamingEventKind>with capacity 50consumeLoop()ConcurrentHashMap<String, TaskPushNotifier> notifiersfor per-task lookupsOverflow behavior: If the queue is full when a new event arrives, the oldest pending event is silently dropped to make room. This is a deliberate design choice — a queue that stays full forever would deadlock the producer.
Test Results
New tests (
MainEventBusProcessorPushNotificationOrderTest)testSameTaskEventsOrderedtestDifferentTasksUnaffectedtestQueueOverflowDropsOldestRegression tests (
MainEventBusProcessorExceptionTest)All 3 existing exception handling tests continue to pass.
Files Changed
server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.javaConcurrentHashMap<String, TaskPushNotifier> notifiersTaskPushNotifierinner class (bounded queue + consumer thread + expiry)sendPushNotification()to route via per-task notifier@PreDestroy stop()to clean up all notifier threadsserver-common/src/test/java/io/a2a/server/events/MainEventBusProcessorPushNotificationOrderTest.java(new)