File tree Expand file tree Collapse file tree 1 file changed +12
-10
lines changed
src/main/java/com/rabbitmq/stream/impl Expand file tree Collapse file tree 1 file changed +12
-10
lines changed Original file line number Diff line number Diff line change @@ -717,16 +717,18 @@ private ClientSubscriptionsManager(
717717 subscriptionTrackers .get (subscriptionId & 0xFF );
718718 if (subscriptionTracker != null ) {
719719 // message at the beginning of the first chunk is ignored
720- // we "simulate" the processing
721- MessageHandlerContext messageHandlerContext =
722- new MessageHandlerContext (
723- offset ,
724- chunkTimestamp ,
725- committedChunkId ,
726- subscriptionTracker .consumer ,
727- (ConsumerFlowStrategy .MessageProcessedCallback ) chunkContext );
728- ((ConsumerFlowStrategy .MessageProcessedCallback ) chunkContext )
729- .processed (messageHandlerContext );
720+ // we "simulate" the processing if possible
721+ if (chunkContext != null ) {
722+ MessageHandlerContext messageHandlerContext =
723+ new MessageHandlerContext (
724+ offset ,
725+ chunkTimestamp ,
726+ committedChunkId ,
727+ subscriptionTracker .consumer ,
728+ (ConsumerFlowStrategy .MessageProcessedCallback ) chunkContext );
729+ ((ConsumerFlowStrategy .MessageProcessedCallback ) chunkContext )
730+ .processed (messageHandlerContext );
731+ }
730732 } else {
731733 LOGGER .debug (
732734 "Could not find stream subscription {} in manager {}, node {} for message ignored listener" ,
You can’t perform that action at this time.
0 commit comments