Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
Premium large-message entities by reading the `com.microsoft:max-message-batch-size` vendor property
from the AMQP sender link instead of using `max-message-size`. ([#48214](https://github.com/Azure/azure-sdk-for-java/pull/48214))
- Fixed `ServiceBusAdministrationClient.updateSubscription()` silently ignoring `defaultMessageTimeToLive` changes. The property was incorrectly nullified before serialization. ([#48495](https://github.com/Azure/azure-sdk-for-java/issues/48495))
- Fixed session-enabled `ServiceBusProcessorClient` logging a spurious `DeliveryNotOnLinkException`
("...does not exist in the link's DeliveryMap") at ERROR when a message handler settles a message
manually (e.g. `complete()`) while auto-complete is left enabled. The V2 session disposition path now
marks the message settled on success, so the redundant auto-settlement short-circuits at the
already-settled guard instead of attempting a second disposition on the receive-link. The message was
always settled correctly; only the misleading error log is removed. ([#47356](https://github.com/Azure/azure-sdk-for-java/issues/47356))

### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,15 @@ private Mono<Void> updateDisposition(ServiceBusReceivedMessage message, Disposit

Mono<Void> updateDispositionMono;
if (receiver != null) {
updateDispositionMono = receiver.updateDisposition(message.getLockToken(), deliveryState);
// Mark the message settled once the broker acknowledges the disposition, mirroring the non-session
// and V1 session paths (ServiceBusReceiverAsyncClient). This arms the message.isSettled() guard above
// so that a redundant settlement attempt (e.g. when the handler calls complete() manually AND
// auto-complete is left enabled) short-circuits here with a benign "already settled" error instead of
// reaching the receive-link and logging a spurious DeliveryNotOnLinkException once the delivery has
// already been removed from the link's DeliveryMap.
// See https://github.com/Azure/azure-sdk-for-java/issues/47356
updateDispositionMono = receiver.updateDisposition(message.getLockToken(), deliveryState)
.<Void>then(Mono.fromRunnable(() -> message.setIsSettled()));
} else {
updateDispositionMono
= Mono.error(DeliveryNotOnLinkException.noMatchingDelivery(message.getLockToken(), deliveryState));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.function.Supplier;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -501,6 +502,62 @@ public void shouldCompleteMessageWhenSessionIdDiffersInCase() {
verify(onTerminate, times(1)).run();
}

@Test
@Execution(ExecutionMode.SAME_THREAD)
public void shouldNotReDispositionWhenHandlerSettlesWithAutoCompleteEnabled() {
// Regression test for https://github.com/Azure/azure-sdk-for-java/issues/47356
// When a session message handler settles the message manually (e.g. complete()) while auto-complete is
// left enabled, the redundant auto-settlement must short-circuit at the message.isSettled() guard and
// must NOT issue a second disposition on the receive-link (which would log a spurious
// DeliveryNotOnLinkException once the delivery has been removed from the link's DeliveryMap).
final String session1Id = "1";

final HashMap<Message, ServiceBusReceivedMessage> session1Messages = createMockMessages(session1Id, 1);
// Wire the mock message's settled flag so the isSettled() guard in updateDisposition() behaves like the
// real message: setIsSettled() flips it to true and isSettled() reflects the current value.
final ServiceBusReceivedMessage message = session1Messages.values().iterator().next();
final AtomicBoolean settled = new AtomicBoolean(false);
when(message.isSettled()).thenAnswer(__ -> settled.get());
doAnswer(__ -> {
settled.set(true);
return null;
}).when(message).setIsSettled();

final TestPublisher<AmqpEndpointState> session1EpStates = TestPublisher.createCold();
session1EpStates.next(AmqpEndpointState.ACTIVE);
final Session session1 = createMockSession(session1Id, session1Messages, session1EpStates);
when(session1.getLink().updateDisposition(any(), any())).thenReturn(Mono.empty());
final MessageSerializer serializer = createMockmessageSerializer(session1Messages);
final ServiceBusSessionAcquirer sessionAcquirer = createMockSessionAcquirer(session1);
final Runnable onTerminate = createMockOnTerminate();

final int maxSessions = 1;
final int concurrency = 1;
final boolean autoDispositionDisabled = false; // auto-complete enabled.
// The handler settles the message manually, mirroring the customer scenario in the issue.
final Consumer<ServiceBusReceivedMessageContext> processMessage = ServiceBusReceivedMessageContext::complete;
final Consumer<ServiceBusErrorContext> processError = e -> {
};
final SessionsMessagePump pump = createSessionsMessagePump(sessionAcquirer, idleTimeoutDisabled, maxSessions,
concurrency, autoDispositionDisabled, serializer, processMessage, processError, onTerminate);

try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
verifier.create(() -> pump.begin()).thenAwait().thenCancel().verify();
}

// The successful manual disposition marks the message settled...
Assertions.assertTrue(message.isSettled());
verify(message, times(1)).setIsSettled();
// ...so the redundant auto-complete short-circuits at the isSettled() guard and the receive-link sees
// exactly ONE disposition (the manual one), never a second that would raise DeliveryNotOnLinkException.
verify(session1.getLink(), times(1)).updateDisposition(lockTokenCaptor.capture(),
deliveryStateCaptor.capture());
Assertions.assertEquals(message.getLockToken(), lockTokenCaptor.getValue());
Assertions.assertEquals(Accepted.getInstance(), deliveryStateCaptor.getValue());
verify(session1.getLink(), times(1)).closeAsync();
verify(onTerminate, times(1)).run();
}

@Test
@Execution(ExecutionMode.SAME_THREAD)
public void shouldEmitErrorIfBeginInvokedMoreThanOnce() {
Expand Down
Loading