Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser
private final ProxyRelayService proxyRelayService;
private final MetadataService metadataService;
private final AdminService adminService;
private final LiteSubscriptionService liteSubscriptionService;

private final MQClientAPIFactory mqClientAPIFactory;
private final MQClientAPIFactory liteSubscriptionAPIFactory;
private final ChannelManager channelManager;

private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor(
Expand All @@ -82,11 +84,23 @@ public LocalServiceManager(BrokerController brokerController, RPCHook rpcHook) {
this.proxyRelayService = new LocalProxyRelayService(brokerController, this.transactionService);
this.metadataService = new LocalMetadataService(brokerController);
this.adminService = new DefaultAdminService(this.mqClientAPIFactory);

// Lite subscriptions use a separate channel
this.liteSubscriptionAPIFactory = new MQClientAPIFactory(
nameserverAccessConfig,
"LiteSubscription_",
1,
new DoNothingClientRemotingProcessor(null),
rpcHook,
scheduledExecutorService);
this.liteSubscriptionService = new LiteSubscriptionService(this.topicRouteService, this.liteSubscriptionAPIFactory);

this.init();
}

protected void init() {
this.appendStartAndShutdown(this.mqClientAPIFactory);
this.appendStartAndShutdown(this.liteSubscriptionAPIFactory);
this.appendStartAndShutdown(this.topicRouteService);
this.appendStartAndShutdown(new LocalServiceManagerStartAndShutdown());
}
Expand Down Expand Up @@ -133,7 +147,7 @@ public AdminService getAdminService() {

@Override
public LiteSubscriptionService getLiteSubscriptionService() {
return null;
return liteSubscriptionService;
}

private class LocalServiceManagerStartAndShutdown implements StartAndShutdown {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
Expand All @@ -35,6 +36,7 @@
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.Message;
Expand Down Expand Up @@ -68,6 +70,7 @@
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopLiteMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopLiteMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
Expand Down Expand Up @@ -200,7 +203,76 @@ public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String bro
@Override
public CompletableFuture<PopResult> popLiteMessage(ProxyContext ctx, AddressableMessageQueue messageQueue,
PopLiteMessageRequestHeader requestHeader, long timeoutMillis) {
throw new NotImplementedException();
RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_LITE_MESSAGE, requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createInvocationChannel(ctx);
InvocationContext invocationContext = new InvocationContext(future);
channel.registerInvocationContext(request.getOpaque(), invocationContext);
ChannelHandlerContext simpleChannelHandlerContext = channel.getChannelHandlerContext();
try {
RemotingCommand response = brokerController.getPopLiteMessageProcessor().processRequest(simpleChannelHandlerContext, request);
if (response != null) {
invocationContext.handle(response);
channel.eraseInvocationContext(request.getOpaque());
}
} catch (Exception e) {
future.completeExceptionally(e);
channel.eraseInvocationContext(request.getOpaque());
log.error("Failed to process popMessage command", e);
}
return future.thenApply(r -> {
// @see org.apache.rocketmq.client.impl.MQClientAPIImpl#processPopLiteResponse
PopStatus popStatus;
List<MessageExt> messageExtList = new ArrayList<>();
switch (r.getCode()) {
case ResponseCode.SUCCESS:
popStatus = PopStatus.FOUND;
ByteBuffer byteBuffer = ByteBuffer.wrap(r.getBody());
messageExtList = MessageDecoder.decodesBatch(
byteBuffer,
true,
false,
true
);
break;
case ResponseCode.POLLING_FULL:
popStatus = PopStatus.POLLING_FULL;
break;
case ResponseCode.POLLING_TIMEOUT:
case ResponseCode.PULL_NOT_FOUND:
popStatus = PopStatus.POLLING_NOT_FOUND;
break;
default:
throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, r.getRemark());
}
PopResult popResult = new PopResult(popStatus, messageExtList);
if (popStatus != PopStatus.FOUND) {
return popResult;
}

PopLiteMessageResponseHeader responseHeader = (PopLiteMessageResponseHeader) r.readCustomHeader();
List<Integer> orderCountList = ExtraInfoUtil.parseLiteOrderCountInfo(responseHeader.getOrderCountInfo(), messageExtList.size());
for (int i = 0; i < messageExtList.size(); i++) {
MessageExt messageExt = messageExtList.get(i);
String[] queues = StringUtils.split(
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH), MixAll.LMQ_DISPATCH_SEPARATOR);
String[] queueOffsets = StringUtils.split(
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET), MixAll.LMQ_DISPATCH_SEPARATOR);

if (null == queues || null == queueOffsets || queues.length != 1 || queues.length != queueOffsets.length) {
continue;
}
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
ExtraInfoUtil.buildExtraInfo(0, responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
responseHeader.getReviveQid(), messageQueue.getTopic(), messageQueue.getBrokerName(), 0, Long.parseLong(queueOffsets[0])));
messageExt.getProperties().computeIfAbsent(
MessageConst.PROPERTY_FIRST_POP_TIME, k -> String.valueOf(responseHeader.getPopTime()));
messageExt.setBrokerName(messageQueue.getBrokerName());
messageExt.setReconsumeTimes(orderCountList != null ? orderCountList.get(i) : 0);
messageExt.setQueueOffset(Long.parseLong(queueOffsets[0]));
}
return popResult;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.rocketmq.broker.processor.AckMessageProcessor;
import org.apache.rocketmq.broker.processor.ChangeInvisibleTimeProcessor;
import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
import org.apache.rocketmq.broker.processor.PopLiteMessageProcessor;
import org.apache.rocketmq.broker.processor.PopMessageProcessor;
import org.apache.rocketmq.broker.processor.RecallMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
Expand All @@ -46,6 +47,7 @@
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
Expand All @@ -68,6 +70,8 @@
import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.header.PopLiteMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopLiteMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader;
Expand Down Expand Up @@ -95,6 +99,8 @@ public class LocalMessageServiceTest extends InitConfigTest {
@Mock
private PopMessageProcessor popMessageProcessorMock;
@Mock
private PopLiteMessageProcessor popLiteMessageProcessorMock;
@Mock
private ChangeInvisibleTimeProcessor changeInvisibleTimeProcessorMock;
@Mock
private AckMessageProcessor ackMessageProcessorMock;
Expand Down Expand Up @@ -126,6 +132,7 @@ public void setUp() throws Throwable {
channelManager = new ChannelManager();
Mockito.when(brokerControllerMock.getSendMessageProcessor()).thenReturn(sendMessageProcessorMock);
Mockito.when(brokerControllerMock.getPopMessageProcessor()).thenReturn(popMessageProcessorMock);
Mockito.when(brokerControllerMock.getPopLiteMessageProcessor()).thenReturn(popLiteMessageProcessorMock);
Mockito.when(brokerControllerMock.getChangeInvisibleTimeProcessor()).thenReturn(changeInvisibleTimeProcessorMock);
Mockito.when(brokerControllerMock.getAckMessageProcessor()).thenReturn(ackMessageProcessorMock);
Mockito.when(brokerControllerMock.getEndTransactionProcessor()).thenReturn(endTransactionProcessorMock);
Expand Down Expand Up @@ -277,6 +284,67 @@ public void testEndTransaction() throws Exception {
}));
}

@Test
public void testPopLiteMessageWriteAndFlush() throws Exception {
int reviveQueueId = 1;
long popTime = System.currentTimeMillis();
long invisibleTime = 3000L;
long startOffset = 100L;
long restNum = 0L;
List<MessageExt> messageExtList = new ArrayList<>();
MessageExt message1 = buildMessageExt(topic, 0, startOffset);
message1.getProperties().put(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, "%LMQ%$topic$lite");
message1.getProperties().put(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, "0");
messageExtList.add(message1);
byte[] body1 = MessageDecoder.encode(message1, false);
MessageExt message2 = buildMessageExt(topic, 0, startOffset + 1);
message2.getProperties().put(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, "%LMQ%$topic$lite");
message2.getProperties().put(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, "1");
messageExtList.add(message2);
byte[] body2 = MessageDecoder.encode(message2, false);
ByteBuffer byteBuffer1 = ByteBuffer.wrap(body1);
ByteBuffer byteBuffer2 = ByteBuffer.wrap(body2);
ByteBuffer b3 = ByteBuffer.allocate(byteBuffer1.limit() + byteBuffer2.limit());
b3.put(byteBuffer1);
b3.put(byteBuffer2);
PopLiteMessageRequestHeader requestHeader = new PopLiteMessageRequestHeader();
requestHeader.setInvisibleTime(invisibleTime);
Mockito.when(popLiteMessageProcessorMock.processRequest(Mockito.any(SimpleChannelHandlerContext.class), Mockito.argThat(argument -> {
boolean first = argument.getCode() == RequestCode.POP_LITE_MESSAGE;
boolean second = argument.readCustomHeader() instanceof PopLiteMessageRequestHeader;
return first && second;
}))).thenAnswer(invocation -> {
SimpleChannelHandlerContext simpleChannelHandlerContext = invocation.getArgument(0);
RemotingCommand request = invocation.getArgument(1);
RemotingCommand response = RemotingCommand.createResponseCommand(PopLiteMessageResponseHeader.class);
response.setOpaque(request.getOpaque());
response.setCode(ResponseCode.SUCCESS);
response.setBody(b3.array());
PopLiteMessageResponseHeader responseHeader = (PopLiteMessageResponseHeader) response.readCustomHeader();
responseHeader.setInvisibleTime(requestHeader.getInvisibleTime());
responseHeader.setPopTime(popTime);
responseHeader.setReviveQid(reviveQueueId);
simpleChannelHandlerContext.writeAndFlush(response);
return null;
});
MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
CompletableFuture<PopResult> future = localMessageService.popLiteMessage(proxyContext, new AddressableMessageQueue(messageQueue, ""), requestHeader, 1000L);
PopResult popResult = future.get();
assertThat(popResult.getPopTime()).isEqualTo(0);
assertThat(popResult.getInvisibleTime()).isEqualTo(0);
assertThat(popResult.getPopStatus()).isEqualTo(PopStatus.FOUND);
assertThat(popResult.getRestNum()).isEqualTo(restNum);
assertThat(popResult.getMsgFoundList().size()).isEqualTo(messageExtList.size());
for (int i = 0; i < popResult.getMsgFoundList().size(); i++) {
MessageExt messageResult = popResult.getMsgFoundList().get(i);
assertThat(messageResult.getBody()).isEqualTo(messageExtList.get(i).getBody());
assertThat(messageResult.getTopic()).isEqualTo(messageExtList.get(i).getTopic());
assertThat(messageResult.getQueueId()).isEqualTo(messageExtList.get(i).getQueueId());
assertThat(messageResult.getQueueOffset()).isEqualTo(Long.valueOf(i));
assertThat(messageResult.getBrokerName()).isEqualTo(brokerName);
}
}

@Test
public void testPopMessageWriteAndFlush() throws Exception {
int reviveQueueId = 1;
Expand Down
Loading