From 4d6f79d29134b2f95f31fda2b37bebe3e9a08b15 Mon Sep 17 00:00:00 2001 From: ksalazar-91 Date: Mon, 22 Jun 2026 22:47:49 -0700 Subject: [PATCH 1/3] [Service Bus] Fix spurious DeliveryNotOnLinkException in session processor (#47356) A session-enabled ServiceBusProcessorClient that settles messages manually (e.g. complete()) while auto-complete is left enabled logged a spurious DeliveryNotOnLinkException ("...does not exist in the link's DeliveryMap") at ERROR. The V2 session disposition path (SessionsMessagePump) never called message.setIsSettled() on success, so the isSettled() guard could not absorb the redundant auto-settlement the way the non-session and V1 session paths do. The second disposition then reached the receive-link after the delivery had already been removed from the DeliveryMap, producing the error log. The message was always settled correctly on the broker; only the misleading ERROR log is removed. Now the session path marks the message settled on a successful disposition, mirroring ServiceBusReceiverAsyncClient, so a redundant settle short-circuits at the already-settled guard. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md | 6 ++++++ .../messaging/servicebus/SessionsMessagePump.java | 10 +++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index 1ecae856a4cf..909c4053200a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -15,6 +15,12 @@ Premium large-message entities by reading the `com.microsoft:max-message-batch-size` vendor property from the AMQP sender link instead of using `max-message-size`. ([#48214](https://github.com/Azure/azure-sdk-for-java/pull/48214)) - Fixed `ServiceBusAdministrationClient.updateSubscription()` silently ignoring `defaultMessageTimeToLive` changes. The property was incorrectly nullified before serialization. ([#48495](https://github.com/Azure/azure-sdk-for-java/issues/48495)) +- Fixed session-enabled `ServiceBusProcessorClient` logging a spurious `DeliveryNotOnLinkException` + ("...does not exist in the link's DeliveryMap") at ERROR when a message handler settles a message + manually (e.g. `complete()`) while auto-complete is left enabled. The V2 session disposition path now + marks the message settled on success, so the redundant auto-settlement short-circuits at the + already-settled guard instead of attempting a second disposition on the receive-link. The message was + always settled correctly; only the misleading error log is removed. ([#47356](https://github.com/Azure/azure-sdk-for-java/issues/47356)) ### Other Changes diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java index 3dd1febd98bb..be876db0822d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java @@ -894,7 +894,15 @@ private Mono updateDisposition(ServiceBusReceivedMessage message, Disposit Mono updateDispositionMono; if (receiver != null) { - updateDispositionMono = receiver.updateDisposition(message.getLockToken(), deliveryState); + // Mark the message settled once the broker acknowledges the disposition, mirroring the non-session + // and V1 session paths (ServiceBusReceiverAsyncClient). This arms the message.isSettled() guard above + // so that a redundant settlement attempt (e.g. when the handler calls complete() manually AND + // auto-complete is left enabled) short-circuits here with a benign "already settled" error instead of + // reaching the receive-link and logging a spurious DeliveryNotOnLinkException once the delivery has + // already been removed from the link's DeliveryMap. + // See https://github.com/Azure/azure-sdk-for-java/issues/47356 + updateDispositionMono = receiver.updateDisposition(message.getLockToken(), deliveryState) + .doOnSuccess(__ -> message.setIsSettled()); } else { updateDispositionMono = Mono.error(DeliveryNotOnLinkException.noMatchingDelivery(message.getLockToken(), deliveryState)); From 5e9b2ff9cbeaa5f78b51537a1d84456226c07217 Mon Sep 17 00:00:00 2001 From: ksalazar-91 Date: Mon, 22 Jun 2026 23:22:03 -0700 Subject: [PATCH 2/3] [Service Bus] Add regression test for session double-settle (#47356) Adds SessionsMessagePumpIsolatedTest#shouldNotReDispositionWhenHandlerSettlesWithAutoCompleteEnabled, covering the V2 session path where a handler settles a message manually while auto-complete is enabled. Verifies the message is marked settled and the redundant auto-settlement short-circuits at the isSettled() guard so the receive-link sees exactly one disposition (no second attempt that would raise DeliveryNotOnLinkException). Addresses the PR review feedback. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../SessionsMessagePumpIsolatedTest.java | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java index 5d5003efe4a6..9c8cf0e77c3a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java @@ -48,6 +48,7 @@ import java.util.function.Supplier; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -501,6 +502,62 @@ public void shouldCompleteMessageWhenSessionIdDiffersInCase() { verify(onTerminate, times(1)).run(); } + @Test + @Execution(ExecutionMode.SAME_THREAD) + public void shouldNotReDispositionWhenHandlerSettlesWithAutoCompleteEnabled() { + // Regression test for https://github.com/Azure/azure-sdk-for-java/issues/47356 + // When a session message handler settles the message manually (e.g. complete()) while auto-complete is + // left enabled, the redundant auto-settlement must short-circuit at the message.isSettled() guard and + // must NOT issue a second disposition on the receive-link (which would log a spurious + // DeliveryNotOnLinkException once the delivery has been removed from the link's DeliveryMap). + final String session1Id = "1"; + + final HashMap session1Messages = createMockMessages(session1Id, 1); + // Wire the mock message's settled flag so the isSettled() guard in updateDisposition() behaves like the + // real message: setIsSettled() flips it to true and isSettled() reflects the current value. + final ServiceBusReceivedMessage message = session1Messages.values().iterator().next(); + final AtomicBoolean settled = new AtomicBoolean(false); + when(message.isSettled()).thenAnswer(__ -> settled.get()); + doAnswer(__ -> { + settled.set(true); + return null; + }).when(message).setIsSettled(); + + final TestPublisher session1EpStates = TestPublisher.createCold(); + session1EpStates.next(AmqpEndpointState.ACTIVE); + final Session session1 = createMockSession(session1Id, session1Messages, session1EpStates); + when(session1.getLink().updateDisposition(any(), any())).thenReturn(Mono.empty()); + final MessageSerializer serializer = createMockmessageSerializer(session1Messages); + final ServiceBusSessionAcquirer sessionAcquirer = createMockSessionAcquirer(session1); + final Runnable onTerminate = createMockOnTerminate(); + + final int maxSessions = 1; + final int concurrency = 1; + final boolean autoDispositionDisabled = false; // auto-complete enabled. + // The handler settles the message manually, mirroring the customer scenario in the issue. + final Consumer processMessage = ServiceBusReceivedMessageContext::complete; + final Consumer processError = e -> { + }; + final SessionsMessagePump pump = createSessionsMessagePump(sessionAcquirer, idleTimeoutDisabled, maxSessions, + concurrency, autoDispositionDisabled, serializer, processMessage, processError, onTerminate); + + try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) { + verifier.create(() -> pump.begin()).thenAwait().thenCancel().verify(); + } + + // The successful manual disposition marks the message settled... + Assertions.assertTrue(message.isSettled()); + verify(message, times(1)).setIsSettled(); + // ...so the redundant auto-complete short-circuits at the isSettled() guard and the receive-link sees + // exactly ONE disposition (the manual one), never a second that would raise DeliveryNotOnLinkException. + verify(session1.getLink(), times(1)).updateDisposition(lockTokenCaptor.capture(), + deliveryStateCaptor.capture()); + Assertions.assertEquals(message.getLockToken(), lockTokenCaptor.getValue()); + Assertions.assertEquals(Accepted.getInstance(), deliveryStateCaptor.getValue()); + verify(session1.getLink(), times(1)).closeAsync(); + verify(onTerminate, times(1)).run(); + } + @Test @Execution(ExecutionMode.SAME_THREAD) public void shouldEmitErrorIfBeginInvokedMoreThanOnce() { From 0417c6c837a953b0688686d1a72ed98a4748e931 Mon Sep 17 00:00:00 2001 From: ksalazar-91 Date: Wed, 24 Jun 2026 14:18:17 -0700 Subject: [PATCH 3/3] [Service Bus] Use then(Mono.fromRunnable) to match sibling settle paths (#47356) Addresses PR review feedback: switch the session disposition success side-effect from doOnSuccess to .then(Mono.fromRunnable(() -> message.setIsSettled())), matching the pattern used by the non-session and V1 session paths in ServiceBusReceiverAsyncClient. Behavior is unchanged. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../com/azure/messaging/servicebus/SessionsMessagePump.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java index be876db0822d..091f81ebee2c 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java @@ -902,7 +902,7 @@ private Mono updateDisposition(ServiceBusReceivedMessage message, Disposit // already been removed from the link's DeliveryMap. // See https://github.com/Azure/azure-sdk-for-java/issues/47356 updateDispositionMono = receiver.updateDisposition(message.getLockToken(), deliveryState) - .doOnSuccess(__ -> message.setIsSettled()); + .then(Mono.fromRunnable(() -> message.setIsSettled())); } else { updateDispositionMono = Mono.error(DeliveryNotOnLinkException.noMatchingDelivery(message.getLockToken(), deliveryState));