diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index 3e97b8682687..09de727268ed 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -23,6 +23,12 @@ Premium large-message entities by reading the `com.microsoft:max-message-batch-size` vendor property from the AMQP sender link instead of using `max-message-size`. ([#48214](https://github.com/Azure/azure-sdk-for-java/pull/48214)) - Fixed `ServiceBusAdministrationClient.updateSubscription()` silently ignoring `defaultMessageTimeToLive` changes. The property was incorrectly nullified before serialization. ([#48495](https://github.com/Azure/azure-sdk-for-java/issues/48495)) +- Fixed session-enabled `ServiceBusProcessorClient` logging a spurious `DeliveryNotOnLinkException` + ("...does not exist in the link's DeliveryMap") at ERROR when a message handler settles a message + manually (e.g. `complete()`) while auto-complete is left enabled. The V2 session disposition path now + marks the message settled on success, so the redundant auto-settlement short-circuits at the + already-settled guard instead of attempting a second disposition on the receive-link. The message was + always settled correctly; only the misleading error log is removed. ([#47356](https://github.com/Azure/azure-sdk-for-java/issues/47356)) ### Other Changes diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java index 3dd1febd98bb..091f81ebee2c 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java @@ -894,7 +894,15 @@ private Mono updateDisposition(ServiceBusReceivedMessage message, Disposit Mono updateDispositionMono; if (receiver != null) { - updateDispositionMono = receiver.updateDisposition(message.getLockToken(), deliveryState); + // Mark the message settled once the broker acknowledges the disposition, mirroring the non-session + // and V1 session paths (ServiceBusReceiverAsyncClient). This arms the message.isSettled() guard above + // so that a redundant settlement attempt (e.g. when the handler calls complete() manually AND + // auto-complete is left enabled) short-circuits here with a benign "already settled" error instead of + // reaching the receive-link and logging a spurious DeliveryNotOnLinkException once the delivery has + // already been removed from the link's DeliveryMap. + // See https://github.com/Azure/azure-sdk-for-java/issues/47356 + updateDispositionMono = receiver.updateDisposition(message.getLockToken(), deliveryState) + .then(Mono.fromRunnable(() -> message.setIsSettled())); } else { updateDispositionMono = Mono.error(DeliveryNotOnLinkException.noMatchingDelivery(message.getLockToken(), deliveryState)); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java index 5d5003efe4a6..9c8cf0e77c3a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java @@ -48,6 +48,7 @@ import java.util.function.Supplier; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -501,6 +502,62 @@ public void shouldCompleteMessageWhenSessionIdDiffersInCase() { verify(onTerminate, times(1)).run(); } + @Test + @Execution(ExecutionMode.SAME_THREAD) + public void shouldNotReDispositionWhenHandlerSettlesWithAutoCompleteEnabled() { + // Regression test for https://github.com/Azure/azure-sdk-for-java/issues/47356 + // When a session message handler settles the message manually (e.g. complete()) while auto-complete is + // left enabled, the redundant auto-settlement must short-circuit at the message.isSettled() guard and + // must NOT issue a second disposition on the receive-link (which would log a spurious + // DeliveryNotOnLinkException once the delivery has been removed from the link's DeliveryMap). + final String session1Id = "1"; + + final HashMap session1Messages = createMockMessages(session1Id, 1); + // Wire the mock message's settled flag so the isSettled() guard in updateDisposition() behaves like the + // real message: setIsSettled() flips it to true and isSettled() reflects the current value. + final ServiceBusReceivedMessage message = session1Messages.values().iterator().next(); + final AtomicBoolean settled = new AtomicBoolean(false); + when(message.isSettled()).thenAnswer(__ -> settled.get()); + doAnswer(__ -> { + settled.set(true); + return null; + }).when(message).setIsSettled(); + + final TestPublisher session1EpStates = TestPublisher.createCold(); + session1EpStates.next(AmqpEndpointState.ACTIVE); + final Session session1 = createMockSession(session1Id, session1Messages, session1EpStates); + when(session1.getLink().updateDisposition(any(), any())).thenReturn(Mono.empty()); + final MessageSerializer serializer = createMockmessageSerializer(session1Messages); + final ServiceBusSessionAcquirer sessionAcquirer = createMockSessionAcquirer(session1); + final Runnable onTerminate = createMockOnTerminate(); + + final int maxSessions = 1; + final int concurrency = 1; + final boolean autoDispositionDisabled = false; // auto-complete enabled. + // The handler settles the message manually, mirroring the customer scenario in the issue. + final Consumer processMessage = ServiceBusReceivedMessageContext::complete; + final Consumer processError = e -> { + }; + final SessionsMessagePump pump = createSessionsMessagePump(sessionAcquirer, idleTimeoutDisabled, maxSessions, + concurrency, autoDispositionDisabled, serializer, processMessage, processError, onTerminate); + + try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) { + verifier.create(() -> pump.begin()).thenAwait().thenCancel().verify(); + } + + // The successful manual disposition marks the message settled... + Assertions.assertTrue(message.isSettled()); + verify(message, times(1)).setIsSettled(); + // ...so the redundant auto-complete short-circuits at the isSettled() guard and the receive-link sees + // exactly ONE disposition (the manual one), never a second that would raise DeliveryNotOnLinkException. + verify(session1.getLink(), times(1)).updateDisposition(lockTokenCaptor.capture(), + deliveryStateCaptor.capture()); + Assertions.assertEquals(message.getLockToken(), lockTokenCaptor.getValue()); + Assertions.assertEquals(Accepted.getInstance(), deliveryStateCaptor.getValue()); + verify(session1.getLink(), times(1)).closeAsync(); + verify(onTerminate, times(1)).run(); + } + @Test @Execution(ExecutionMode.SAME_THREAD) public void shouldEmitErrorIfBeginInvokedMoreThanOnce() {