diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index 5d8f9b0c2f..e7f33c87df 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -717,16 +717,18 @@ private ClientSubscriptionsManager( subscriptionTrackers.get(subscriptionId & 0xFF); if (subscriptionTracker != null) { // message at the beginning of the first chunk is ignored - // we "simulate" the processing - MessageHandlerContext messageHandlerContext = - new MessageHandlerContext( - offset, - chunkTimestamp, - committedChunkId, - subscriptionTracker.consumer, - (ConsumerFlowStrategy.MessageProcessedCallback) chunkContext); - ((ConsumerFlowStrategy.MessageProcessedCallback) chunkContext) - .processed(messageHandlerContext); + // we "simulate" the processing if possible + if (chunkContext != null) { + MessageHandlerContext messageHandlerContext = + new MessageHandlerContext( + offset, + chunkTimestamp, + committedChunkId, + subscriptionTracker.consumer, + (ConsumerFlowStrategy.MessageProcessedCallback) chunkContext); + ((ConsumerFlowStrategy.MessageProcessedCallback) chunkContext) + .processed(messageHandlerContext); + } } else { LOGGER.debug( "Could not find stream subscription {} in manager {}, node {} for message ignored listener",