Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
cac4f78
add isWakeCommitWhenPutMessage for AIO
Nov 14, 2024
38f03d0
Merge branch 'apache:develop' into develop
zk-drizzle Feb 25, 2025
63d0eb6
Merge branch 'apache:develop' into develop
zk-drizzle Mar 10, 2025
e75c906
Implemented Timer message, Transaction Message, Index based on rocksdb
Aug 8, 2025
09f7671
add timeline roll interval can be config
Aug 8, 2025
3d4aa00
optimize the code
Aug 8, 2025
0d0efeb
optimize the code
Aug 11, 2025
63cd3bf
add sone logic for recover commitlog
Aug 15, 2025
1184544
optimize the code
Sep 3, 2025
343adc8
update
Sep 3, 2025
7c5b86f
update
Sep 3, 2025
fb3fdf0
optimize the code
Sep 3, 2025
35bb4f5
update
Sep 4, 2025
3945e81
update
Sep 5, 2025
ca36da3
optimize the code
Sep 10, 2025
e9899ca
optimize the code
Sep 12, 2025
679b4d1
update
Sep 12, 2025
361d2d9
update
Sep 16, 2025
653ba0a
update
Sep 17, 2025
dadb1ac
update
Sep 17, 2025
900e29f
update
Sep 17, 2025
c290e67
update
Sep 25, 2025
04738f9
update
Sep 25, 2025
2ff33d0
update
Sep 25, 2025
18eb884
update
Sep 25, 2025
41e5782
update
Sep 25, 2025
de75d1b
update
Sep 25, 2025
1f1727b
update
Sep 25, 2025
4aada4b
Merge branch 'apache:develop' into develop
zk-drizzle Oct 22, 2025
cd4a00e
Implement Timer message, transaction message, and index based on RocksDB
Oct 23, 2025
ec95088
update
Oct 23, 2025
7a2aa07
update
Oct 23, 2025
4a67a25
update
zhouli11 Oct 23, 2025
b606d55
update
zhouli11 Oct 23, 2025
3e37b98
update
zhouli11 Oct 23, 2025
be6cac4
update
zhouli11 Oct 23, 2025
b9c9022
update
zhouli11 Oct 23, 2025
6ee5481
update
zhouli11 Oct 23, 2025
c67aae4
update
zhouli11 Oct 23, 2025
2872ab4
update the code
Oct 23, 2025
1739ba1
update
Oct 23, 2025
599777e
update the code
Oct 24, 2025
62e9b9f
update
Oct 24, 2025
e077a1a
update
Oct 27, 2025
67f4ef0
Merge branch 'apache:develop' into develop
zk-drizzle Oct 27, 2025
98757c6
update code
Oct 27, 2025
5224bfc
optimize the code
Oct 28, 2025
fd779ce
Merge branch 'apache:develop' into develop
zk-drizzle Oct 28, 2025
14d35e9
update
Oct 28, 2025
fb58f56
Merge remote-tracking branch 'github/develop' into opencore/develop_1022
Oct 29, 2025
873732a
update code
Oct 29, 2025
9fa994b
update
Oct 29, 2025
6f05121
update
Nov 5, 2025
f1cc7ba
update
Nov 6, 2025
f1fc2ad
Merge branch 'develop' into opencore/develop_1022
zk-drizzle Dec 11, 2025
8f3d44e
optimize the code
Dec 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.apache.rocketmq.broker.transaction.TransactionMetricsFlushService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.broker.transaction.rocksdb.TransactionalMessageRocksDBService;
import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
Expand Down Expand Up @@ -178,6 +179,8 @@
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.TimerMetrics;
import org.apache.rocketmq.store.timer.rocksdb.TimerMessageRocksDBStore;
import org.apache.rocketmq.store.transaction.TransMessageRocksDBStore;

public class BrokerController {
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
Expand Down Expand Up @@ -269,6 +272,8 @@ public class BrokerController {
private BrokerStats brokerStats;
private InetSocketAddress storeHost;
private TimerMessageStore timerMessageStore;
private TimerMessageRocksDBStore timerMessageRocksDBStore;
private TransMessageRocksDBStore transMessageRocksDBStore;
private TimerCheckpoint timerCheckpoint;
protected BrokerFastFailure brokerFastFailure;
private Configuration configuration;
Expand All @@ -277,6 +282,7 @@ public class BrokerController {
protected TransactionalMessageCheckService transactionalMessageCheckService;
protected TransactionalMessageService transactionalMessageService;
protected AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
protected TransactionalMessageRocksDBService transactionalMessageRocksDBService;
protected volatile boolean shutdown = false;
protected ShutdownHook shutdownHook;
private volatile boolean isScheduleServiceStart = false;
Expand Down Expand Up @@ -865,6 +871,14 @@ public boolean initializeMessageStore() {
this.timerMessageStore = new TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint, timerMetrics, brokerStatsManager);
this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));
this.messageStore.setTimerMessageStore(this.timerMessageStore);
if (messageStoreConfig.isTimerRocksDBEnable()) {
this.timerMessageRocksDBStore = new TimerMessageRocksDBStore(messageStore, timerMetrics, brokerStatsManager);
this.messageStore.setTimerMessageRocksDBStore(timerMessageRocksDBStore);
}
}
if (messageStoreConfig.isTransRocksDBEnable()) {
this.transMessageRocksDBStore = new TransMessageRocksDBStore(messageStore, brokerStatsManager, new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
this.messageStore.setTransRocksDBStore(transMessageRocksDBStore);
}
} catch (Exception e) {
result = false;
Expand Down Expand Up @@ -904,6 +918,9 @@ public boolean recoverAndInitService() throws CloneNotSupportedException {

if (messageStoreConfig.isTimerWheelEnable()) {
result = result && this.timerMessageStore.load();
if (messageStoreConfig.isTimerRocksDBEnable()) {
result = result && this.timerMessageRocksDBStore.load();
}
}

//scheduleMessageService load after messageStore load success
Expand Down Expand Up @@ -1060,6 +1077,10 @@ private void initialTransaction() {
this.transactionMetricsFlushService = new TransactionMetricsFlushService(this);
this.transactionMetricsFlushService.start();

if (messageStoreConfig.isTransRocksDBEnable()) {
this.transactionalMessageRocksDBService = new TransactionalMessageRocksDBService(messageStore, this);
this.transactionalMessageRocksDBService.start();
}
}

private void initialRpcHooks() {
Expand Down Expand Up @@ -1400,6 +1421,14 @@ public void setTimerMessageStore(TimerMessageStore timerMessageStore) {
this.timerMessageStore = timerMessageStore;
}

public TimerMessageRocksDBStore getTimerMessageRocksDBStore() {
return timerMessageRocksDBStore;
}

public void setTimerMessageRocksDBStore(TimerMessageRocksDBStore timerMessageRocksDBStore) {
this.timerMessageRocksDBStore = timerMessageRocksDBStore;
}

public AckMessageProcessor getAckMessageProcessor() {
return ackMessageProcessor;
}
Expand Down Expand Up @@ -1473,6 +1502,10 @@ protected void shutdownBasicService() {
this.transactionMetricsFlushService.shutdown();
}

if (this.transactionalMessageRocksDBService != null) {
this.transactionalMessageRocksDBService.shutdown();
}

if (this.notificationProcessor != null) {
this.notificationProcessor.getPopLongPollingService().shutdown();
}
Expand All @@ -1488,6 +1521,15 @@ protected void shutdownBasicService() {
if (this.timerMessageStore != null) {
this.timerMessageStore.shutdown();
}

if (this.timerMessageRocksDBStore != null) {
this.timerMessageRocksDBStore.shutdown();
}

if (this.transMessageRocksDBStore != null) {
this.transMessageRocksDBStore.shutdown();
}

if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
Expand Down Expand Up @@ -1562,7 +1604,7 @@ protected void shutdownBasicService() {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(false);
}

if (this.loadBalanceExecutor != null) {
this.loadBalanceExecutor.shutdown();
}
Expand Down Expand Up @@ -1693,6 +1735,10 @@ protected void startBasicService() throws Exception {
this.timerMessageStore.start();
}

if (this.timerMessageRocksDBStore != null && this.messageStoreConfig.isTimerRocksDBEnable()) {
this.timerMessageRocksDBStore.start();
}

if (this.replicasManager != null) {
this.replicasManager.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@

import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static org.apache.rocketmq.common.message.MessageConst.TIMER_ENGINE_TYPE;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;

public class AdminBrokerProcessor implements NettyRequestProcessor {
Expand Down Expand Up @@ -406,6 +407,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
return this.listAcl(ctx, request);
case RequestCode.POP_ROLLBACK:
return this.transferPopToFsStore(ctx, request);
case RequestCode.SWITCH_TIMER_ENGINE:
return this.switchTimerEngine(ctx, request);
default:
return getUnknownCmdResponse(ctx, request);
}
Expand Down Expand Up @@ -2880,7 +2883,11 @@ private RemotingCommand resumeCheckHalfMessage(ChannelHandlerContext ctx,

private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
MessageExtBrokerInner inner = new MessageExtBrokerInner();
inner.setTopic(TransactionalMessageUtil.buildHalfTopic());
if (brokerController.getMessageStoreConfig().isTransRocksDBEnable() && !brokerController.getMessageStoreConfig().isTransWriteOriginTransHalfEnable()) {
inner.setTopic(TransactionalMessageUtil.buildHalfTopicForRocksDB());
} else {
inner.setTopic(TransactionalMessageUtil.buildHalfTopic());
}
inner.setBody(msgExt.getBody());
inner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(inner, msgExt.getProperties());
Expand Down Expand Up @@ -3409,4 +3416,64 @@ private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, Remoting
}
return response;
}

private synchronized RemotingCommand switchTimerEngine(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
if (!this.brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
LOGGER.info("switchTimerEngine error, broker timerWheelEnable is false");
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark("broker timerWheelEnable is false");
return response;
}
if (null == request.getExtFields()) {
LOGGER.info("switchTimerEngine extFields is null");
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark("param error, extFields is null");
return response;
}
String engineType = request.getExtFields().get(TIMER_ENGINE_TYPE);
if (StringUtils.isEmpty(engineType) || !MessageConst.TIMER_ENGINE_ROCKSDB_TIMELINE.equals(engineType) && !MessageConst.TIMER_ENGINE_FILE_TIME_WHEEL.equals(engineType)) {
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark("param error");
return response;
}
try {
Properties properties = new Properties();
boolean result = false;
if (MessageConst.TIMER_ENGINE_ROCKSDB_TIMELINE.equals(engineType)) {
if (this.brokerController.getTimerMessageRocksDBStore() == null) {
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark("timerUseRocksDB muse be configured true when broker start");
return response;
}
result = this.brokerController.getTimerMessageRocksDBStore().restart();
if (result) {
properties.put("timerStopEnqueue", Boolean.TRUE.toString());
properties.put("timerUseRocksDB", Boolean.TRUE.toString());
properties.put("timerRocksDBStopScan", Boolean.FALSE.toString());
}
} else {
result = this.brokerController.getTimerMessageStore().restart();
if (result) {
properties.put("timerRocksDBStopScan", Boolean.TRUE.toString());
properties.put("timerStopEnqueue", Boolean.FALSE.toString());
}
}
if (result) {
this.brokerController.getConfiguration().update(properties);
response.setCode(ResponseCode.SUCCESS);
response.setRemark("switch timer engine success");
LOGGER.info("switchTimerEngine success");
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("switch timer engine error");
LOGGER.info("switchTimerEngine error");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, @zk-drizzle . I'm just curious why all the logs here are INFO level, regardless of whether the restart succeeds or fails.

Shouldn't WARN or ERROR level logs be used to provide some warning information to the user when the restart fails?

}
} catch (Exception e) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("switch timer engine error");
LOGGER.error("switchTimerEngine error : {}", e.getMessage());
}
return response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
Expand Down Expand Up @@ -146,7 +147,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
deletePrepareMessage(result);
// successful committed, then total num of half-messages minus 1
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getTopic(), -1);
this.brokerController.getBrokerMetricsManager().getCommitMessagesTotal().add(1, this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
Expand All @@ -173,7 +174,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
}
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
deletePrepareMessage(result);
// roll back, then total num of half-messages minus 1
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC), -1);
this.brokerController.getBrokerMetricsManager().getRollBackMessagesTotal().add(1, this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
Expand All @@ -188,6 +189,26 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
return response;
}

private void deletePrepareMessage(OperationResult result) {
if (null == result || null == result.getPrepareMessage()) {
LOGGER.error("deletePrepareMessage param error, result is null or prepareMessage is null");
return;
}
MessageExt prepareMessage = result.getPrepareMessage();
String halfTopic = prepareMessage.getTopic();
if (StringUtils.isEmpty(halfTopic)) {
LOGGER.error("deletePrepareMessage halfTopic is empty, halfTopic: {}", halfTopic);
return;
}
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(halfTopic)) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(prepareMessage);
} else if (this.brokerController.getMessageStoreConfig().isTransRocksDBEnable() && TopicValidator.RMQ_SYS_ROCKSDB_TRANS_HALF_TOPIC.equals(halfTopic)) {
this.brokerController.getMessageStore().getTransRocksDBStore().deletePrepareMessage(prepareMessage);
} else {
LOGGER.warn("deletePrepareMessage error, topic of half message is: {}, transRocksDBEnable: {}", halfTopic, this.brokerController.getMessageStoreConfig().isTransRocksDBEnable());
}
}

/**
* If you specify a custom first check time CheckImmunityTimeInSeconds,
* And the commit/rollback request whose validity period exceeds CheckImmunityTimeInSeconds and is not checked back will be processed and failed
Expand Down Expand Up @@ -265,10 +286,17 @@ private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
: TopicFilterType.SINGLE_TAG;
long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
String checkTimes = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES);
if (StringUtils.isEmpty(checkTimes) && this.brokerController.getMessageStoreConfig().isTransRocksDBEnable() && null != this.brokerController.getMessageStore().getTransRocksDBStore()) {
Integer checkTimesRocksDB = this.brokerController.getMessageStore().getTransRocksDBStore().getCheckTimes(msgInner.getTopic(), msgInner.getTransactionId(), msgExt.getCommitLogOffset());
if (null != checkTimesRocksDB && checkTimesRocksDB >= 0) {
msgExt.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(checkTimesRocksDB));
}
}
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(MessageDecoder.messageProperties2String(msgExt.getProperties())));
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import io.netty.channel.FileRegion;
import io.opentelemetry.api.common.Attributes;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.pagecache.OneMessageTransfer;
import org.apache.rocketmq.broker.pagecache.QueryMessageTransfer;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
Expand Down Expand Up @@ -84,16 +86,19 @@ public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand r
.decodeCommandCustomHeader(QueryMessageRequestHeader.class);

response.setOpaque(request.getOpaque());

String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG);
if (isUniqueKey != null && isUniqueKey.equals("true")) {
String indexType = requestHeader.getIndexType();
String lastKey = requestHeader.getLastKey();
String isUniqueKey = null;
if (null != request.getExtFields()) {
isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG);
}
if (!StringUtils.isEmpty(isUniqueKey) && Boolean.parseBoolean(isUniqueKey)) {
requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum());
indexType = MessageConst.INDEX_UNIQUE_TYPE;
} else if (StringUtils.isEmpty(indexType)) {
indexType = MessageConst.INDEX_KEY_TYPE;
}

final QueryMessageResult queryMessageResult =
this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),
requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(),
requestHeader.getEndTimestamp());
final QueryMessageResult queryMessageResult = this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(), requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(), requestHeader.getEndTimestamp(), indexType, lastKey);
assert queryMessageResult != null;

responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset());
Expand Down
Loading
Loading