Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOutClient {
private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);

protected final MessagingProcessor messagingProcessor;
protected final RemotingChannelManager remotingChannelManager;
protected final ChannelEventListener clientHousekeepingService;
Expand All @@ -90,12 +90,12 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
protected final ScheduledExecutorService timerExecutor;
protected final TlsCertificateManager tlsCertificateManager;
protected final RemotingTlsReloadHandler tlsReloadHandler;


public RemotingProtocolServer(MessagingProcessor messagingProcessor, TlsCertificateManager tlsCertificateManager) throws Exception {
this.messagingProcessor = messagingProcessor;
this.remotingChannelManager = new RemotingChannelManager(this, messagingProcessor.getProxyRelayService());

RequestPipeline pipeline = createRequestPipeline(messagingProcessor);
this.getTopicRouteActivity = new GetTopicRouteActivity(pipeline, messagingProcessor);
this.clientManagerActivity = new ClientManagerActivity(pipeline, messagingProcessor, remotingChannelManager);
Expand Down Expand Up @@ -197,7 +197,7 @@ public RemotingProtocolServer(MessagingProcessor messagingProcessor, TlsCertific

this.registerRemotingServer(this.defaultRemotingServer);
}

protected class RemotingTlsReloadHandler implements TlsCertificateManager.TlsContextReloadListener {
@Override
public void onTlsContextReload() {
Expand All @@ -207,7 +207,7 @@ public void onTlsContextReload() {
}
}
}

protected void registerRemotingServer(RemotingServer remotingServer) {
remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageActivity, this.sendMessageExecutor);
remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageActivity, this.sendMessageExecutor);
Expand All @@ -230,6 +230,7 @@ protected void registerRemotingServer(RemotingServer remotingServer) {
remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, consumerManagerActivity, this.updateOffsetExecutor);
remotingServer.registerProcessor(RequestCode.GET_CONSUMER_CONNECTION_LIST, consumerManagerActivity, this.updateOffsetExecutor);

remotingServer.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, consumerManagerActivity, this.defaultExecutor);
remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManagerActivity, this.defaultExecutor);
remotingServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManagerActivity, this.defaultExecutor);
remotingServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManagerActivity, this.defaultExecutor);
Expand All @@ -240,7 +241,7 @@ protected void registerRemotingServer(RemotingServer remotingServer) {

remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, getTopicRouteActivity, this.topicRouteExecutor);
}

@Override
public void shutdown() throws Exception {
// Unregister the TLS context reload handler
Expand All @@ -255,7 +256,7 @@ public void shutdown() throws Exception {
this.topicRouteExecutor.shutdown();
this.defaultExecutor.shutdown();
}

@Override
public void start() throws Exception {
// Register the TLS context reload handler
Expand All @@ -264,7 +265,7 @@ public void start() throws Exception {
this.remotingChannelManager.start();
this.defaultRemotingServer.start();
}

@Override
public CompletableFuture<RemotingCommand> invokeToClient(Channel channel, RemotingCommand request,
long timeoutMillis) {
Expand All @@ -291,7 +292,7 @@ public void operationFail(Throwable throwable) {
}
return future;
}

protected RequestPipeline createRequestPipeline(MessagingProcessor messagingProcessor) {
RequestPipeline pipeline = (ctx, request, context) -> {
};
Expand All @@ -304,7 +305,7 @@ protected RequestPipeline createRequestPipeline(MessagingProcessor messagingProc
}
return pipeline.pipe(new ContextInitPipeline());
}

protected class ThreadPoolHeadSlowTimeMillsMonitor implements ThreadPoolStatusMonitor {

private final long maxWaitTimeMillsInQueue;
Expand All @@ -328,7 +329,7 @@ public boolean needPrintJstack(ThreadPoolExecutor executor, double value) {
return value > maxWaitTimeMillsInQueue;
}
}

protected long headSlowTimeMills(BlockingQueue<Runnable> q) {
try {
long slowTimeMills = 0;
Expand All @@ -348,7 +349,7 @@ protected long headSlowTimeMills(BlockingQueue<Runnable> q) {
}
return -1;
}

protected void cleanExpireRequest() {
ProxyConfig config = ConfigurationManager.getProxyConfig();

Expand All @@ -359,7 +360,7 @@ protected void cleanExpireRequest() {
cleanExpiredRequestInQueue(this.topicRouteExecutor, config.getRemotingWaitTimeMillsInTopicRouteQueue());
cleanExpiredRequestInQueue(this.defaultExecutor, config.getRemotingWaitTimeMillsInDefaultQueue());
}

protected void cleanExpiredRequestInQueue(ThreadPoolExecutor threadPoolExecutor, long maxWaitTimeMillsInQueue) {
while (true) {
try {
Expand Down Expand Up @@ -391,7 +392,7 @@ protected void cleanExpiredRequestInQueue(ThreadPoolExecutor threadPoolExecutor,
}
}
}

private RequestTask castRunnable(final Runnable runnable) {
try {
if (runnable instanceof FutureTaskExt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,36 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.ExceptionUtils;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.proxy.service.relay.ProxyChannel;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseBody;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseHeader;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader;

public class ConsumerManagerActivity extends AbstractRemotingActivity {
public ConsumerManagerActivity(RequestPipeline requestPipeline, MessagingProcessor messagingProcessor) {
super(requestPipeline, messagingProcessor);
}

@Override
protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
ProxyContext context) throws Exception {
Expand All @@ -73,12 +79,15 @@ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCom
case RequestCode.GET_CONSUMER_CONNECTION_LIST: {
return getConsumerConnectionList(ctx, request, context);
}
case RequestCode.GET_CONSUMER_RUNNING_INFO: {
return getConsumerRunningInfo(ctx, request, context);
}
default:
break;
}
return null;
}

protected RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request,
ProxyContext context) throws Exception {
RemotingCommand response = RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
Expand All @@ -91,7 +100,7 @@ protected RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, Remo
response.setCode(ResponseCode.SUCCESS);
return response;
}

protected RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, RemotingCommand request,
ProxyContext context) throws Exception {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
Expand Down Expand Up @@ -128,7 +137,40 @@ protected RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, R
response.setRemark("the consumer group[" + header.getConsumerGroup() + "] not online");
return response;
}

protected RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request,
ProxyContext context) throws Exception {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
GetConsumerRunningInfoRequestHeader header =
(GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(context, header.getConsumerGroup());
ClientChannelInfo clientChannelInfo = null;
if (consumerGroupInfo != null) {
clientChannelInfo = consumerGroupInfo.findChannel(header.getClientId());
}
if (clientChannelInfo == null || !(clientChannelInfo.getChannel() instanceof ProxyChannel)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("The Consumer <%s> <%s> not online", header.getConsumerGroup(), header.getClientId()));
return response;
}

CompletableFuture<ProxyRelayResult<ConsumerRunningInfo>> relayFuture =
((ProxyChannel) clientChannelInfo.getChannel()).processGetConsumerRunningInfo(request, header);
relayFuture.thenAccept(result -> {
RemotingCommand relayResponse = RemotingCommand.createResponseCommand(null);
relayResponse.setCode(result.getCode());
relayResponse.setRemark(result.getRemark());
if (result.getCode() == ResponseCode.SUCCESS && result.getResult() != null) {
relayResponse.setBody(result.getResult().encode());
}
writeResponse(ctx, context, request, relayResponse);
}).exceptionally(t -> {
writeErrResponse(ctx, context, request, ExceptionUtils.getRealException(t));
return null;
});
return null;
}

protected RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request,
ProxyContext context) throws Exception {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
Expand All @@ -149,7 +191,7 @@ protected RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand
});
return null;
}

protected RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request,
ProxyContext context) throws Exception {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@
* not implement yet
*/
public class ClusterProxyRelayService extends AbstractProxyRelayService {

public ClusterProxyRelayService(TransactionService transactionService) {
super(transactionService);
}

@Override
public CompletableFuture<ProxyRelayResult<ConsumerRunningInfo>> processGetConsumerRunningInfo(
ProxyContext context, RemotingCommand command,
GetConsumerRunningInfoRequestHeader header) {
return null;
return new CompletableFuture<>();
}

@Override
public CompletableFuture<ProxyRelayResult<ConsumeMessageDirectlyResult>> processConsumeMessageDirectly(
ProxyContext context, RemotingCommand command,
Expand Down
Loading