diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index c26f6bc2ef4..e224d647f0b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -66,7 +66,7 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOutClient { private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); - + protected final MessagingProcessor messagingProcessor; protected final RemotingChannelManager remotingChannelManager; protected final ChannelEventListener clientHousekeepingService; @@ -90,12 +90,12 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu protected final ScheduledExecutorService timerExecutor; protected final TlsCertificateManager tlsCertificateManager; protected final RemotingTlsReloadHandler tlsReloadHandler; - - + + public RemotingProtocolServer(MessagingProcessor messagingProcessor, TlsCertificateManager tlsCertificateManager) throws Exception { this.messagingProcessor = messagingProcessor; this.remotingChannelManager = new RemotingChannelManager(this, messagingProcessor.getProxyRelayService()); - + RequestPipeline pipeline = createRequestPipeline(messagingProcessor); this.getTopicRouteActivity = new GetTopicRouteActivity(pipeline, messagingProcessor); this.clientManagerActivity = new ClientManagerActivity(pipeline, messagingProcessor, remotingChannelManager); @@ -197,7 +197,7 @@ public RemotingProtocolServer(MessagingProcessor messagingProcessor, TlsCertific this.registerRemotingServer(this.defaultRemotingServer); } - + protected class RemotingTlsReloadHandler implements TlsCertificateManager.TlsContextReloadListener { @Override public void onTlsContextReload() { @@ -207,7 +207,7 @@ public void onTlsContextReload() { } } } - + protected void registerRemotingServer(RemotingServer remotingServer) { remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageActivity, this.sendMessageExecutor); remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageActivity, this.sendMessageExecutor); @@ -230,6 +230,7 @@ protected void registerRemotingServer(RemotingServer remotingServer) { remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, consumerManagerActivity, this.updateOffsetExecutor); remotingServer.registerProcessor(RequestCode.GET_CONSUMER_CONNECTION_LIST, consumerManagerActivity, this.updateOffsetExecutor); + remotingServer.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, consumerManagerActivity, this.defaultExecutor); remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManagerActivity, this.defaultExecutor); remotingServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManagerActivity, this.defaultExecutor); remotingServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManagerActivity, this.defaultExecutor); @@ -240,7 +241,7 @@ protected void registerRemotingServer(RemotingServer remotingServer) { remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, getTopicRouteActivity, this.topicRouteExecutor); } - + @Override public void shutdown() throws Exception { // Unregister the TLS context reload handler @@ -255,7 +256,7 @@ public void shutdown() throws Exception { this.topicRouteExecutor.shutdown(); this.defaultExecutor.shutdown(); } - + @Override public void start() throws Exception { // Register the TLS context reload handler @@ -264,7 +265,7 @@ public void start() throws Exception { this.remotingChannelManager.start(); this.defaultRemotingServer.start(); } - + @Override public CompletableFuture invokeToClient(Channel channel, RemotingCommand request, long timeoutMillis) { @@ -291,7 +292,7 @@ public void operationFail(Throwable throwable) { } return future; } - + protected RequestPipeline createRequestPipeline(MessagingProcessor messagingProcessor) { RequestPipeline pipeline = (ctx, request, context) -> { }; @@ -304,7 +305,7 @@ protected RequestPipeline createRequestPipeline(MessagingProcessor messagingProc } return pipeline.pipe(new ContextInitPipeline()); } - + protected class ThreadPoolHeadSlowTimeMillsMonitor implements ThreadPoolStatusMonitor { private final long maxWaitTimeMillsInQueue; @@ -328,7 +329,7 @@ public boolean needPrintJstack(ThreadPoolExecutor executor, double value) { return value > maxWaitTimeMillsInQueue; } } - + protected long headSlowTimeMills(BlockingQueue q) { try { long slowTimeMills = 0; @@ -348,7 +349,7 @@ protected long headSlowTimeMills(BlockingQueue q) { } return -1; } - + protected void cleanExpireRequest() { ProxyConfig config = ConfigurationManager.getProxyConfig(); @@ -359,7 +360,7 @@ protected void cleanExpireRequest() { cleanExpiredRequestInQueue(this.topicRouteExecutor, config.getRemotingWaitTimeMillsInTopicRouteQueue()); cleanExpiredRequestInQueue(this.defaultExecutor, config.getRemotingWaitTimeMillsInDefaultQueue()); } - + protected void cleanExpiredRequestInQueue(ThreadPoolExecutor threadPoolExecutor, long maxWaitTimeMillsInQueue) { while (true) { try { @@ -391,7 +392,7 @@ protected void cleanExpiredRequestInQueue(ThreadPoolExecutor threadPoolExecutor, } } } - + private RequestTask castRunnable(final Runnable runnable) { try { if (runnable instanceof FutureTaskExt) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java index ce1f1b4a514..ec2afd7b619 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java @@ -25,30 +25,36 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.ExceptionUtils; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.proxy.service.relay.ProxyChannel; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult; import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.body.Connection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseBody; import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseHeader; -import org.apache.rocketmq.proxy.common.ProxyContext; -import org.apache.rocketmq.proxy.processor.MessagingProcessor; -import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader; public class ConsumerManagerActivity extends AbstractRemotingActivity { public ConsumerManagerActivity(RequestPipeline requestPipeline, MessagingProcessor messagingProcessor) { super(requestPipeline, messagingProcessor); } - + @Override protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { @@ -73,12 +79,15 @@ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCom case RequestCode.GET_CONSUMER_CONNECTION_LIST: { return getConsumerConnectionList(ctx, request, context); } + case RequestCode.GET_CONSUMER_RUNNING_INFO: { + return getConsumerRunningInfo(ctx, request, context); + } default: break; } return null; } - + protected RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { RemotingCommand response = RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class); @@ -91,7 +100,7 @@ protected RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, Remo response.setCode(ResponseCode.SUCCESS); return response; } - + protected RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { RemotingCommand response = RemotingCommand.createResponseCommand(null); @@ -128,7 +137,40 @@ protected RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, R response.setRemark("the consumer group[" + header.getConsumerGroup() + "] not online"); return response; } + + protected RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + RemotingCommand response = RemotingCommand.createResponseCommand(null); + GetConsumerRunningInfoRequestHeader header = + (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); + ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(context, header.getConsumerGroup()); + ClientChannelInfo clientChannelInfo = null; + if (consumerGroupInfo != null) { + clientChannelInfo = consumerGroupInfo.findChannel(header.getClientId()); + } + if (clientChannelInfo == null || !(clientChannelInfo.getChannel() instanceof ProxyChannel)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("The Consumer <%s> <%s> not online", header.getConsumerGroup(), header.getClientId())); + return response; + } + CompletableFuture> relayFuture = + ((ProxyChannel) clientChannelInfo.getChannel()).processGetConsumerRunningInfo(request, header); + relayFuture.thenAccept(result -> { + RemotingCommand relayResponse = RemotingCommand.createResponseCommand(null); + relayResponse.setCode(result.getCode()); + relayResponse.setRemark(result.getRemark()); + if (result.getCode() == ResponseCode.SUCCESS && result.getResult() != null) { + relayResponse.setBody(result.getResult().encode()); + } + writeResponse(ctx, context, request, relayResponse); + }).exceptionally(t -> { + writeErrResponse(ctx, context, request, ExceptionUtils.getRealException(t)); + return null; + }); + return null; + } + protected RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { final RemotingCommand response = RemotingCommand.createResponseCommand(null); @@ -149,7 +191,7 @@ protected RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand }); return null; } - + protected RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { final RemotingCommand response = RemotingCommand.createResponseCommand(null); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayService.java index 71ce222a8c0..4e1d2bc272d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayService.java @@ -29,18 +29,18 @@ * not implement yet */ public class ClusterProxyRelayService extends AbstractProxyRelayService { - + public ClusterProxyRelayService(TransactionService transactionService) { super(transactionService); } - + @Override public CompletableFuture> processGetConsumerRunningInfo( ProxyContext context, RemotingCommand command, GetConsumerRunningInfoRequestHeader header) { - return null; + return new CompletableFuture<>(); } - + @Override public CompletableFuture> processConsumeMessageDirectly( ProxyContext context, RemotingCommand command, diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java index 72fdfd0259a..75df0570f89 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java @@ -52,9 +52,9 @@ public abstract class ProxyChannel extends SimpleChannel { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); protected final SocketAddress remoteSocketAddress; protected final SocketAddress localSocketAddress; - + protected final ProxyRelayService proxyRelayService; - + protected ProxyChannel(ProxyRelayService proxyRelayService, Channel parent, String remoteAddress, String localAddress) { super(parent, remoteAddress, localAddress); @@ -62,7 +62,7 @@ protected ProxyChannel(ProxyRelayService proxyRelayService, Channel parent, Stri this.remoteSocketAddress = NetworkUtil.string2SocketAddress(remoteAddress); this.localSocketAddress = NetworkUtil.string2SocketAddress(localAddress); } - + protected ProxyChannel(ProxyRelayService proxyRelayService, Channel parent, ChannelId id, String remoteAddress, String localAddress) { super(parent, id, remoteAddress, localAddress); @@ -70,7 +70,7 @@ protected ProxyChannel(ProxyRelayService proxyRelayService, Channel parent, Chan this.remoteSocketAddress = NetworkUtil.string2SocketAddress(remoteAddress); this.localSocketAddress = NetworkUtil.string2SocketAddress(localAddress); } - + @Override public ChannelFuture writeAndFlush(Object msg) { CompletableFuture processFuture = new CompletableFuture<>(); @@ -129,78 +129,90 @@ public ChannelFuture writeAndFlush(Object msg) { }); return promise; } - + protected abstract CompletableFuture processOtherMessage(Object msg); - + protected abstract CompletableFuture processCheckTransaction( CheckTransactionStateRequestHeader header, MessageExt messageExt, TransactionData transactionData, CompletableFuture> responseFuture); - + protected abstract CompletableFuture processNotifyUnsubscribeLite(NotifyUnsubscribeLiteRequestHeader header); - + protected abstract CompletableFuture processGetConsumerRunningInfo( RemotingCommand command, GetConsumerRunningInfoRequestHeader header, CompletableFuture> responseFuture); - + + public CompletableFuture> processGetConsumerRunningInfo( + RemotingCommand command, + GetConsumerRunningInfoRequestHeader header) { + CompletableFuture> responseFuture = new CompletableFuture<>(); + this.processGetConsumerRunningInfo(command, header, responseFuture) + .exceptionally(t -> { + responseFuture.completeExceptionally(t); + return null; + }); + return responseFuture; + } + protected abstract CompletableFuture processConsumeMessageDirectly( RemotingCommand command, ConsumeMessageDirectlyResultRequestHeader header, MessageExt messageExt, CompletableFuture> responseFuture); - + @Override public ChannelConfig config() { return null; } - + @Override public ChannelMetadata metadata() { return null; } - + @Override protected AbstractUnsafe newUnsafe() { return null; } - + @Override protected boolean isCompatible(EventLoop loop) { return false; } - + @Override protected void doBind(SocketAddress localAddress) throws Exception { } - + @Override protected void doDisconnect() throws Exception { } - + @Override protected void doClose() throws Exception { } - + @Override protected void doBeginRead() throws Exception { } - + @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { } - + @Override protected SocketAddress localAddress0() { return this.localSocketAddress; } - + @Override protected SocketAddress remoteAddress0() { return this.remoteSocketAddress; diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerTest.java new file mode 100644 index 00000000000..19b1f880b1a --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting; + +import org.apache.rocketmq.proxy.config.InitConfigTest; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; +import org.apache.rocketmq.remoting.RemotingServer; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class RemotingProtocolServerTest extends InitConfigTest { + @Mock + private MessagingProcessor messagingProcessor; + @Mock + private TlsCertificateManager tlsCertificateManager; + @Mock + private ProxyRelayService proxyRelayService; + private RemotingProtocolServer remotingProtocolServer; + + @Before + public void setUp() throws Exception { + when(messagingProcessor.getProxyRelayService()).thenReturn(proxyRelayService); + remotingProtocolServer = new RemotingProtocolServer(messagingProcessor, tlsCertificateManager); + } + + @After + public void tearDown() throws Exception { + if (remotingProtocolServer != null) { + remotingProtocolServer.shutdown(); + } + } + + @Test + public void testRegisterGetConsumerRunningInfoProcessor() { + RemotingServer remotingServer = mock(RemotingServer.class); + + remotingProtocolServer.registerRemotingServer(remotingServer); + + verify(remotingServer).registerProcessor( + eq(RequestCode.GET_CONSUMER_RUNNING_INFO), + same(remotingProtocolServer.consumerManagerActivity), + same(remotingProtocolServer.defaultExecutor)); + } +} diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivityTest.java new file mode 100644 index 00000000000..f2aa240b2a8 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivityTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.ConsumerGroupInfo; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.proxy.config.InitConfigTest; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.service.channel.SimpleChannel; +import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext; +import org.apache.rocketmq.proxy.service.relay.ProxyChannel; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; +import org.apache.rocketmq.proxy.service.transaction.TransactionData; +import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.NotifyUnsubscribeLiteRequestHeader; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ConsumerManagerActivityTest extends InitConfigTest { + private static final String GROUP = "group"; + private static final String CLIENT_ID = "clientId"; + + private ConsumerManagerActivity consumerManagerActivity; + + @Mock + private MessagingProcessor messagingProcessor; + @Mock + private ConsumerGroupInfo consumerGroupInfo; + @Mock + private ProxyRelayService consumerProxyRelayService; + @Mock + private ClientChannelInfo clientChannelInfo; + private CompletableFuture> consumerRunningInfoFuture; + private ProxyChannel consumerChannel; + @Spy + private ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "1", "2")) { + @Override + public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { + return null; + } + }; + + @Before + public void setUp() { + this.consumerManagerActivity = new ConsumerManagerActivity(null, messagingProcessor); + this.consumerRunningInfoFuture = new CompletableFuture<>(); + this.consumerChannel = new ProxyChannel(consumerProxyRelayService, null, "127.0.0.1:1", "127.0.0.1:2") { + @Override + public boolean isOpen() { + return true; + } + + @Override + public boolean isActive() { + return true; + } + + @Override + protected CompletableFuture processOtherMessage(Object msg) { + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture processCheckTransaction(CheckTransactionStateRequestHeader header, + MessageExt messageExt, TransactionData transactionData, + CompletableFuture> responseFuture) { + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture processNotifyUnsubscribeLite(NotifyUnsubscribeLiteRequestHeader header) { + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture processGetConsumerRunningInfo(RemotingCommand command, + GetConsumerRunningInfoRequestHeader header, + CompletableFuture> responseFuture) { + consumerRunningInfoFuture.whenComplete((result, t) -> { + if (t != null) { + responseFuture.completeExceptionally(t); + } else { + responseFuture.complete(result); + } + }); + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture processConsumeMessageDirectly(RemotingCommand command, + ConsumeMessageDirectlyResultRequestHeader header, MessageExt messageExt, + CompletableFuture> responseFuture) { + return CompletableFuture.completedFuture(null); + } + }; + } + + @Test + public void testGetConsumerRunningInfo() throws Exception { + GetConsumerRunningInfoRequestHeader header = new GetConsumerRunningInfoRequestHeader(); + header.setConsumerGroup(GROUP); + header.setClientId(CLIENT_ID); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, header); + request.makeCustomHeaderToNet(); + ClientChannelInfo clientChannelInfo = new ClientChannelInfo(consumerChannel, CLIENT_ID, LanguageCode.JAVA, 0); + + when(messagingProcessor.getConsumerGroupInfo(any(), eq(GROUP))).thenReturn(consumerGroupInfo); + when(consumerGroupInfo.findChannel(eq(CLIENT_ID))).thenReturn(clientChannelInfo); + + RemotingCommand response = consumerManagerActivity.processRequest0(ctx, request, null); + assertThat(response).isNull(); + + ConsumerRunningInfo runningInfo = new ConsumerRunningInfo(); + runningInfo.setJstack("jstack"); + consumerRunningInfoFuture.complete(new ProxyRelayResult<>(ResponseCode.SUCCESS, "ok", runningInfo)); + + ArgumentCaptor captor = ArgumentCaptor.forClass(RemotingCommand.class); + verify(ctx, times(1)).writeAndFlush(captor.capture()); + assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.SUCCESS); + assertThat(captor.getValue().getRemark()).isEqualTo("ok"); + assertThat(captor.getValue().getBody()).isEqualTo(runningInfo.encode()); + } + + @Test + public void testGetConsumerRunningInfoWhenRelayReturnsError() throws Exception { + GetConsumerRunningInfoRequestHeader header = new GetConsumerRunningInfoRequestHeader(); + header.setConsumerGroup(GROUP); + header.setClientId(CLIENT_ID); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, header); + request.makeCustomHeaderToNet(); + ClientChannelInfo proxyClientChannelInfo = new ClientChannelInfo(consumerChannel, CLIENT_ID, LanguageCode.JAVA, 0); + + when(messagingProcessor.getConsumerGroupInfo(any(), eq(GROUP))).thenReturn(consumerGroupInfo); + when(consumerGroupInfo.findChannel(eq(CLIENT_ID))).thenReturn(proxyClientChannelInfo); + + RemotingCommand response = consumerManagerActivity.processRequest0(ctx, request, null); + assertThat(response).isNull(); + + consumerRunningInfoFuture.complete(new ProxyRelayResult<>(ResponseCode.SYSTEM_ERROR, "failed", null)); + + ArgumentCaptor captor = ArgumentCaptor.forClass(RemotingCommand.class); + verify(ctx, times(1)).writeAndFlush(captor.capture()); + assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(captor.getValue().getRemark()).isEqualTo("failed"); + assertThat(captor.getValue().getBody()).isNull(); + } + + @Test + public void testGetConsumerRunningInfoWhenRelayThrows() throws Exception { + GetConsumerRunningInfoRequestHeader header = new GetConsumerRunningInfoRequestHeader(); + header.setConsumerGroup(GROUP); + header.setClientId(CLIENT_ID); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, header); + request.makeCustomHeaderToNet(); + ClientChannelInfo proxyClientChannelInfo = new ClientChannelInfo(consumerChannel, CLIENT_ID, LanguageCode.JAVA, 0); + + when(messagingProcessor.getConsumerGroupInfo(any(), eq(GROUP))).thenReturn(consumerGroupInfo); + when(consumerGroupInfo.findChannel(eq(CLIENT_ID))).thenReturn(proxyClientChannelInfo); + + RemotingCommand response = consumerManagerActivity.processRequest0(ctx, request, null); + assertThat(response).isNull(); + + consumerRunningInfoFuture.completeExceptionally(new RuntimeException("failed")); + + ArgumentCaptor captor = ArgumentCaptor.forClass(RemotingCommand.class); + verify(ctx, times(1)).writeAndFlush(captor.capture()); + assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(captor.getValue().getRemark()).isEqualTo("failed"); + } + + @Test + public void testGetConsumerRunningInfoWhenConsumerNotOnline() throws Exception { + GetConsumerRunningInfoRequestHeader header = new GetConsumerRunningInfoRequestHeader(); + header.setConsumerGroup(GROUP); + header.setClientId(CLIENT_ID); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, header); + request.makeCustomHeaderToNet(); + + when(messagingProcessor.getConsumerGroupInfo(any(), eq(GROUP))).thenReturn(null); + + RemotingCommand response = consumerManagerActivity.processRequest0(ctx, request, null); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getRemark()).contains(GROUP, CLIENT_ID, "not online"); + } + + @Test + public void testGetConsumerRunningInfoWhenConsumerChannelIsNotProxyChannel() throws Exception { + GetConsumerRunningInfoRequestHeader header = new GetConsumerRunningInfoRequestHeader(); + header.setConsumerGroup(GROUP); + header.setClientId(CLIENT_ID); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, header); + request.makeCustomHeaderToNet(); + + when(messagingProcessor.getConsumerGroupInfo(any(), eq(GROUP))).thenReturn(consumerGroupInfo); + when(consumerGroupInfo.findChannel(eq(CLIENT_ID))).thenReturn(clientChannelInfo); + when(clientChannelInfo.getChannel()).thenReturn(new SimpleChannel(null, "1", "2")); + + RemotingCommand response = consumerManagerActivity.processRequest0(ctx, request, null); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getRemark()).contains(GROUP, CLIENT_ID, "not online"); + } +} diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayServiceTest.java new file mode 100644 index 00000000000..aaff42fe66b --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayServiceTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.relay; + +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ClusterProxyRelayServiceTest { + @Test + public void testProcessGetConsumerRunningInfoReturnsPendingFuture() { + ClusterProxyRelayService clusterProxyRelayService = new ClusterProxyRelayService(null); + GetConsumerRunningInfoRequestHeader header = new GetConsumerRunningInfoRequestHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, header); + + CompletableFuture> future = + clusterProxyRelayService.processGetConsumerRunningInfo(ProxyContext.create(), request, header); + + assertThat(future).isNotNull(); + assertThat(future).isNotDone(); + } +} diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/relay/ProxyChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/relay/ProxyChannelTest.java index 03be5cdb018..00ddb45fe6e 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/relay/ProxyChannelTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/relay/ProxyChannelTest.java @@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.apache.commons.lang3.NotImplementedException; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageDecoder; @@ -43,7 +44,9 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -160,4 +163,57 @@ protected CompletableFuture processNotifyUnsubscribeLite(NotifyUnsubscribe assertTrue(channel.writeAndFlush(consumerRunningInfoRequest).isSuccess()); assertTrue(channel.writeAndFlush(consumeMessageDirectlyResult).isSuccess()); } + + @Test + public void testProcessGetConsumerRunningInfoWhenProcessFails() { + RuntimeException exception = new RuntimeException("failed"); + MockProxyChannel channel = new MockProxyChannel(this.proxyRelayService, null, "127.0.0.2:8888", "127.0.0.1:10911") { + @Override + protected CompletableFuture processOtherMessage(Object msg) { + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture processCheckTransaction(CheckTransactionStateRequestHeader header, + MessageExt messageExt, TransactionData transactionData, + CompletableFuture> responseFuture) { + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture processGetConsumerRunningInfo(RemotingCommand command, + GetConsumerRunningInfoRequestHeader header, + CompletableFuture> responseFuture) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(exception); + return future; + } + + @Override + protected CompletableFuture processConsumeMessageDirectly(RemotingCommand command, + ConsumeMessageDirectlyResultRequestHeader header, MessageExt messageExt, + CompletableFuture> responseFuture) { + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture processNotifyUnsubscribeLite(NotifyUnsubscribeLiteRequestHeader header) { + return CompletableFuture.completedFuture(null); + } + }; + + CompletableFuture> future = + channel.processGetConsumerRunningInfo(mock(RemotingCommand.class), new GetConsumerRunningInfoRequestHeader()); + + assertTrue(future.isCompletedExceptionally()); + try { + future.get(); + fail("Expected the future to complete exceptionally"); + } catch (ExecutionException e) { + assertSame(exception, e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + fail("Interrupted while waiting for the future"); + } + } } diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java index 88afbeef2a0..a1025ea346b 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.test.client.consumer.filter; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -25,6 +26,8 @@ import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.impl.FindBrokerResult; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -41,6 +44,7 @@ import org.junit.Test; import static com.google.common.truth.Truth.assertThat; +import static org.awaitility.Awaitility.await; public class SqlFilterIT extends BaseConf { private static Logger logger = LoggerFactory.getLogger(SqlFilterIT.class); @@ -98,6 +102,20 @@ public void testFilterPullConsumer() throws Exception { List receivedMessage = new ArrayList<>(2); Set mqs = consumer.fetchSubscribeMessageQueues(topic); + MQClientInstance mqClientInstance = consumer.getDefaultMQPullConsumerImpl() + .getRebalanceImpl().getmQClientFactory(); + await().atMost(Duration.ofSeconds(30)).until(() -> { + mqClientInstance.updateTopicRouteInfoFromNameServer(topic); + mqClientInstance.sendHeartbeatToAllBrokerWithLock(); + for (MessageQueue mq : mqs) { + FindBrokerResult brokerResult = mqClientInstance.findBrokerAddressInSubscribe( + mqClientInstance.getBrokerNameFromMessageQueue(mq), 0, false); + if (brokerResult == null || brokerResult.getBrokerVersion() == 0) { + return false; + } + } + return true; + }); for (MessageQueue mq : mqs) { SINGLE_MQ: while (true) {