diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java index 8f5073bb3aa..44f3a309cc2 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java @@ -55,8 +55,10 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser private final ProxyRelayService proxyRelayService; private final MetadataService metadataService; private final AdminService adminService; + private final LiteSubscriptionService liteSubscriptionService; private final MQClientAPIFactory mqClientAPIFactory; + private final MQClientAPIFactory liteSubscriptionAPIFactory; private final ChannelManager channelManager; private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor( @@ -82,11 +84,23 @@ public LocalServiceManager(BrokerController brokerController, RPCHook rpcHook) { this.proxyRelayService = new LocalProxyRelayService(brokerController, this.transactionService); this.metadataService = new LocalMetadataService(brokerController); this.adminService = new DefaultAdminService(this.mqClientAPIFactory); + + // Lite subscriptions use a separate channel + this.liteSubscriptionAPIFactory = new MQClientAPIFactory( + nameserverAccessConfig, + "LiteSubscription_", + 1, + new DoNothingClientRemotingProcessor(null), + rpcHook, + scheduledExecutorService); + this.liteSubscriptionService = new LiteSubscriptionService(this.topicRouteService, this.liteSubscriptionAPIFactory); + this.init(); } protected void init() { this.appendStartAndShutdown(this.mqClientAPIFactory); + this.appendStartAndShutdown(this.liteSubscriptionAPIFactory); this.appendStartAndShutdown(this.topicRouteService); this.appendStartAndShutdown(new LocalServiceManagerStartAndShutdown()); } @@ -133,7 +147,7 @@ public AdminService getAdminService() { @Override public LiteSubscriptionService getLiteSubscriptionService() { - return null; + return liteSubscriptionService; } private class LocalServiceManagerStartAndShutdown implements StartAndShutdown { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java index c93fa93983c..662652f3e09 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.client.consumer.AckResult; import org.apache.rocketmq.client.consumer.AckStatus; @@ -35,6 +36,7 @@ import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.Message; @@ -68,6 +70,7 @@ import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PopLiteMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.PopLiteMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; @@ -200,7 +203,76 @@ public CompletableFuture endTransactionOneway(ProxyContext ctx, String bro @Override public CompletableFuture popLiteMessage(ProxyContext ctx, AddressableMessageQueue messageQueue, PopLiteMessageRequestHeader requestHeader, long timeoutMillis) { - throw new NotImplementedException(); + RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_LITE_MESSAGE, requestHeader, ctx.getLanguage()); + CompletableFuture future = new CompletableFuture<>(); + SimpleChannel channel = channelManager.createInvocationChannel(ctx); + InvocationContext invocationContext = new InvocationContext(future); + channel.registerInvocationContext(request.getOpaque(), invocationContext); + ChannelHandlerContext simpleChannelHandlerContext = channel.getChannelHandlerContext(); + try { + RemotingCommand response = brokerController.getPopLiteMessageProcessor().processRequest(simpleChannelHandlerContext, request); + if (response != null) { + invocationContext.handle(response); + channel.eraseInvocationContext(request.getOpaque()); + } + } catch (Exception e) { + future.completeExceptionally(e); + channel.eraseInvocationContext(request.getOpaque()); + log.error("Failed to process popMessage command", e); + } + return future.thenApply(r -> { + // @see org.apache.rocketmq.client.impl.MQClientAPIImpl#processPopLiteResponse + PopStatus popStatus; + List messageExtList = new ArrayList<>(); + switch (r.getCode()) { + case ResponseCode.SUCCESS: + popStatus = PopStatus.FOUND; + ByteBuffer byteBuffer = ByteBuffer.wrap(r.getBody()); + messageExtList = MessageDecoder.decodesBatch( + byteBuffer, + true, + false, + true + ); + break; + case ResponseCode.POLLING_FULL: + popStatus = PopStatus.POLLING_FULL; + break; + case ResponseCode.POLLING_TIMEOUT: + case ResponseCode.PULL_NOT_FOUND: + popStatus = PopStatus.POLLING_NOT_FOUND; + break; + default: + throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, r.getRemark()); + } + PopResult popResult = new PopResult(popStatus, messageExtList); + if (popStatus != PopStatus.FOUND) { + return popResult; + } + + PopLiteMessageResponseHeader responseHeader = (PopLiteMessageResponseHeader) r.readCustomHeader(); + List orderCountList = ExtraInfoUtil.parseLiteOrderCountInfo(responseHeader.getOrderCountInfo(), messageExtList.size()); + for (int i = 0; i < messageExtList.size(); i++) { + MessageExt messageExt = messageExtList.get(i); + String[] queues = StringUtils.split( + messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH), MixAll.LMQ_DISPATCH_SEPARATOR); + String[] queueOffsets = StringUtils.split( + messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET), MixAll.LMQ_DISPATCH_SEPARATOR); + + if (null == queues || null == queueOffsets || queues.length != 1 || queues.length != queueOffsets.length) { + continue; + } + messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, + ExtraInfoUtil.buildExtraInfo(0, responseHeader.getPopTime(), responseHeader.getInvisibleTime(), + responseHeader.getReviveQid(), messageQueue.getTopic(), messageQueue.getBrokerName(), 0, Long.parseLong(queueOffsets[0]))); + messageExt.getProperties().computeIfAbsent( + MessageConst.PROPERTY_FIRST_POP_TIME, k -> String.valueOf(responseHeader.getPopTime())); + messageExt.setBrokerName(messageQueue.getBrokerName()); + messageExt.setReconsumeTimes(orderCountList != null ? orderCountList.get(i) : 0); + messageExt.setQueueOffset(Long.parseLong(queueOffsets[0])); + } + return popResult; + }); } @Override diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java index 52ba521f802..c4e07debb4c 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.broker.processor.AckMessageProcessor; import org.apache.rocketmq.broker.processor.ChangeInvisibleTimeProcessor; import org.apache.rocketmq.broker.processor.EndTransactionProcessor; +import org.apache.rocketmq.broker.processor.PopLiteMessageProcessor; import org.apache.rocketmq.broker.processor.PopMessageProcessor; import org.apache.rocketmq.broker.processor.RecallMessageProcessor; import org.apache.rocketmq.broker.processor.SendMessageProcessor; @@ -46,6 +47,7 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; @@ -68,6 +70,8 @@ import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader; import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; +import org.apache.rocketmq.remoting.protocol.header.PopLiteMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.PopLiteMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; @@ -95,6 +99,8 @@ public class LocalMessageServiceTest extends InitConfigTest { @Mock private PopMessageProcessor popMessageProcessorMock; @Mock + private PopLiteMessageProcessor popLiteMessageProcessorMock; + @Mock private ChangeInvisibleTimeProcessor changeInvisibleTimeProcessorMock; @Mock private AckMessageProcessor ackMessageProcessorMock; @@ -126,6 +132,7 @@ public void setUp() throws Throwable { channelManager = new ChannelManager(); Mockito.when(brokerControllerMock.getSendMessageProcessor()).thenReturn(sendMessageProcessorMock); Mockito.when(brokerControllerMock.getPopMessageProcessor()).thenReturn(popMessageProcessorMock); + Mockito.when(brokerControllerMock.getPopLiteMessageProcessor()).thenReturn(popLiteMessageProcessorMock); Mockito.when(brokerControllerMock.getChangeInvisibleTimeProcessor()).thenReturn(changeInvisibleTimeProcessorMock); Mockito.when(brokerControllerMock.getAckMessageProcessor()).thenReturn(ackMessageProcessorMock); Mockito.when(brokerControllerMock.getEndTransactionProcessor()).thenReturn(endTransactionProcessorMock); @@ -277,6 +284,67 @@ public void testEndTransaction() throws Exception { })); } + @Test + public void testPopLiteMessageWriteAndFlush() throws Exception { + int reviveQueueId = 1; + long popTime = System.currentTimeMillis(); + long invisibleTime = 3000L; + long startOffset = 100L; + long restNum = 0L; + List messageExtList = new ArrayList<>(); + MessageExt message1 = buildMessageExt(topic, 0, startOffset); + message1.getProperties().put(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, "%LMQ%$topic$lite"); + message1.getProperties().put(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, "0"); + messageExtList.add(message1); + byte[] body1 = MessageDecoder.encode(message1, false); + MessageExt message2 = buildMessageExt(topic, 0, startOffset + 1); + message2.getProperties().put(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, "%LMQ%$topic$lite"); + message2.getProperties().put(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, "1"); + messageExtList.add(message2); + byte[] body2 = MessageDecoder.encode(message2, false); + ByteBuffer byteBuffer1 = ByteBuffer.wrap(body1); + ByteBuffer byteBuffer2 = ByteBuffer.wrap(body2); + ByteBuffer b3 = ByteBuffer.allocate(byteBuffer1.limit() + byteBuffer2.limit()); + b3.put(byteBuffer1); + b3.put(byteBuffer2); + PopLiteMessageRequestHeader requestHeader = new PopLiteMessageRequestHeader(); + requestHeader.setInvisibleTime(invisibleTime); + Mockito.when(popLiteMessageProcessorMock.processRequest(Mockito.any(SimpleChannelHandlerContext.class), Mockito.argThat(argument -> { + boolean first = argument.getCode() == RequestCode.POP_LITE_MESSAGE; + boolean second = argument.readCustomHeader() instanceof PopLiteMessageRequestHeader; + return first && second; + }))).thenAnswer(invocation -> { + SimpleChannelHandlerContext simpleChannelHandlerContext = invocation.getArgument(0); + RemotingCommand request = invocation.getArgument(1); + RemotingCommand response = RemotingCommand.createResponseCommand(PopLiteMessageResponseHeader.class); + response.setOpaque(request.getOpaque()); + response.setCode(ResponseCode.SUCCESS); + response.setBody(b3.array()); + PopLiteMessageResponseHeader responseHeader = (PopLiteMessageResponseHeader) response.readCustomHeader(); + responseHeader.setInvisibleTime(requestHeader.getInvisibleTime()); + responseHeader.setPopTime(popTime); + responseHeader.setReviveQid(reviveQueueId); + simpleChannelHandlerContext.writeAndFlush(response); + return null; + }); + MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId); + CompletableFuture future = localMessageService.popLiteMessage(proxyContext, new AddressableMessageQueue(messageQueue, ""), requestHeader, 1000L); + PopResult popResult = future.get(); + assertThat(popResult.getPopTime()).isEqualTo(0); + assertThat(popResult.getInvisibleTime()).isEqualTo(0); + assertThat(popResult.getPopStatus()).isEqualTo(PopStatus.FOUND); + assertThat(popResult.getRestNum()).isEqualTo(restNum); + assertThat(popResult.getMsgFoundList().size()).isEqualTo(messageExtList.size()); + for (int i = 0; i < popResult.getMsgFoundList().size(); i++) { + MessageExt messageResult = popResult.getMsgFoundList().get(i); + assertThat(messageResult.getBody()).isEqualTo(messageExtList.get(i).getBody()); + assertThat(messageResult.getTopic()).isEqualTo(messageExtList.get(i).getTopic()); + assertThat(messageResult.getQueueId()).isEqualTo(messageExtList.get(i).getQueueId()); + assertThat(messageResult.getQueueOffset()).isEqualTo(Long.valueOf(i)); + assertThat(messageResult.getBrokerName()).isEqualTo(brokerName); + } + } + @Test public void testPopMessageWriteAndFlush() throws Exception { int reviveQueueId = 1;