Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.file.SystemPropertiesHandler;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.consensus.ConsensusFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -124,10 +125,15 @@ public static void checkSystemProperties() throws IOException {
}
}

// Only the data region protocol could have been persisted as the old PipeConsensus name
// during a jar-only upgrade, so only that field needs compatibility normalization.
// Consensus protocol configuration
boolean needRewriteConsensusProtocol = false;

String configNodeConsensusProtocolClass =
systemProperties.getProperty(CN_CONSENSUS_PROTOCOL, null);
if (!configNodeConsensusProtocolClass.equals(conf.getConfigNodeConsensusProtocolClass())) {
if (!Objects.equals(
configNodeConsensusProtocolClass, conf.getConfigNodeConsensusProtocolClass())) {
LOGGER.warn(
format,
CN_CONSENSUS_PROTOCOL,
Expand All @@ -136,9 +142,22 @@ public static void checkSystemProperties() throws IOException {
conf.setConfigNodeConsensusProtocolClass(configNodeConsensusProtocolClass);
}

String dataRegionConsensusProtocolClass =
String persistedDataRegionConsensusProtocolClass =
systemProperties.getProperty(DATA_CONSENSUS_PROTOCOL, null);
if (!dataRegionConsensusProtocolClass.equals(conf.getDataRegionConsensusProtocolClass())) {
String dataRegionConsensusProtocolClass =
ConsensusFactory.normalizeConsensusProtocolClass(persistedDataRegionConsensusProtocolClass);
if (!Objects.equals(
persistedDataRegionConsensusProtocolClass, dataRegionConsensusProtocolClass)) {
systemProperties.setProperty(DATA_CONSENSUS_PROTOCOL, dataRegionConsensusProtocolClass);
needRewriteConsensusProtocol = true;
LOGGER.warn(
"[SystemProperties] Normalize {} from {} to {} for compatibility.",
DATA_CONSENSUS_PROTOCOL,
persistedDataRegionConsensusProtocolClass,
dataRegionConsensusProtocolClass);
}
if (!Objects.equals(
dataRegionConsensusProtocolClass, conf.getDataRegionConsensusProtocolClass())) {
LOGGER.warn(
format,
DATA_CONSENSUS_PROTOCOL,
Expand All @@ -149,14 +168,18 @@ public static void checkSystemProperties() throws IOException {

String schemaRegionConsensusProtocolClass =
systemProperties.getProperty(SCHEMA_CONSENSUS_PROTOCOL, null);
if (!schemaRegionConsensusProtocolClass.equals(conf.getSchemaRegionConsensusProtocolClass())) {
if (!Objects.equals(
schemaRegionConsensusProtocolClass, conf.getSchemaRegionConsensusProtocolClass())) {
LOGGER.warn(
format,
SCHEMA_CONSENSUS_PROTOCOL,
conf.getSchemaRegionConsensusProtocolClass(),
schemaRegionConsensusProtocolClass);
conf.setSchemaRegionConsensusProtocolClass(schemaRegionConsensusProtocolClass);
}
if (needRewriteConsensusProtocol) {
systemPropertiesHandler.overwrite(systemProperties);
}

// PartitionSlot configuration
if (systemProperties.getProperty(SERIES_PARTITION_SLOT_NUM, null) != null) {
Expand Down Expand Up @@ -265,7 +288,9 @@ public static void storeSystemParameters() throws IOException {
// Consensus protocol configuration
systemProperties.setProperty(CN_CONSENSUS_PROTOCOL, conf.getConfigNodeConsensusProtocolClass());
systemProperties.setProperty(
DATA_CONSENSUS_PROTOCOL, conf.getDataRegionConsensusProtocolClass());
DATA_CONSENSUS_PROTOCOL,
ConsensusFactory.normalizeConsensusProtocolClass(
conf.getDataRegionConsensusProtocolClass()));
systemProperties.setProperty(
SCHEMA_CONSENSUS_PROTOCOL, conf.getSchemaRegionConsensusProtocolClass());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public class ConsensusFactory {
public static final String SIMPLE_CONSENSUS = "org.apache.iotdb.consensus.simple.SimpleConsensus";
public static final String RATIS_CONSENSUS = "org.apache.iotdb.consensus.ratis.RatisConsensus";
public static final String IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.IoTConsensus";
// Keep the pre-rename class name for stale system properties / snapshots restored after a
// jar-only upgrade.
public static final String LEGACY_IOT_CONSENSUS_V2 =
"org.apache.iotdb.consensus.pipe.PipeConsensus";
public static final String REAL_IOT_CONSENSUS_V2 =
"org.apache.iotdb.consensus.pipe.IoTConsensusV2";
public static final String IOT_CONSENSUS_V2 = "org.apache.iotdb.consensus.iot.IoTConsensusV2";
Expand All @@ -49,11 +53,24 @@ private ConsensusFactory() {
throw new IllegalStateException("Utility class ConsensusFactory");
}

// Downstream code compares against IOT_CONSENSUS_V2 directly, so persisted legacy names must be
// normalized to the canonical constant before they fan out.
public static String normalizeConsensusProtocolClass(String className) {
if (className == null) {
return null;
}
if (LEGACY_IOT_CONSENSUS_V2.equals(className) || REAL_IOT_CONSENSUS_V2.equals(className)) {
return IOT_CONSENSUS_V2;
}
return className;
}

public static Optional<IConsensus> getConsensusImpl(
String className, ConsensusConfig config, IStateMachine.Registry registry) {
try {
className = normalizeConsensusProtocolClass(className);
// special judge for IoTConsensusV2
if (className.equals(IOT_CONSENSUS_V2)) {
if (IOT_CONSENSUS_V2.equals(className)) {
className = REAL_IOT_CONSENSUS_V2;
// initialize iotConsensusV2's thrift component
IoTV2GlobalComponentContainer.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,33 @@ private void checkImmutableSystemProperties() throws IOException {
if (properties.containsKey(CLUSTER_ID)) {
config.setClusterId(properties.getProperty(CLUSTER_ID));
}
// Only the data region protocol could have been persisted as the old PipeConsensus name
// during a jar-only upgrade, so only that field needs compatibility normalization.
boolean needRewriteConsensusProtocol = false;
if (properties.containsKey(SCHEMA_REGION_CONSENSUS_PROTOCOL)) {
config.setSchemaRegionConsensusProtocolClass(
properties.getProperty(SCHEMA_REGION_CONSENSUS_PROTOCOL));
}
if (properties.containsKey(DATA_REGION_CONSENSUS_PROTOCOL)) {
config.setDataRegionConsensusProtocolClass(
properties.getProperty(DATA_REGION_CONSENSUS_PROTOCOL));
final String persistedDataRegionConsensusProtocolClass =
properties.getProperty(DATA_REGION_CONSENSUS_PROTOCOL);
final String dataRegionConsensusProtocolClass =
ConsensusFactory.normalizeConsensusProtocolClass(
persistedDataRegionConsensusProtocolClass);
if (!Objects.equals(
persistedDataRegionConsensusProtocolClass, dataRegionConsensusProtocolClass)) {
properties.setProperty(DATA_REGION_CONSENSUS_PROTOCOL, dataRegionConsensusProtocolClass);
needRewriteConsensusProtocol = true;
logger.warn(
"[SystemProperties] Normalize {} from {} to {} for compatibility.",
DATA_REGION_CONSENSUS_PROTOCOL,
persistedDataRegionConsensusProtocolClass,
dataRegionConsensusProtocolClass);
}
config.setDataRegionConsensusProtocolClass(dataRegionConsensusProtocolClass);
}
if (needRewriteConsensusProtocol) {
systemPropertiesHandler.overwrite(properties);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ protected void initConstructors() {
pluginConstructors.put(
BuiltinPipePlugin.IOT_CONSENSUS_V2_PROCESSOR.getPipePluginName(),
IoTConsensusV2Processor::new);
// Keep the pre-rename plugin name wired to the new implementation for stale PipeMeta.
pluginConstructors.put(
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(),
IoTConsensusV2Processor::new);
pluginConstructors.put(
BuiltinPipePlugin.RENAME_DATABASE_PROCESSOR.getPipePluginName(),
RenameDatabaseProcessor::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ protected void initConstructors() {
pluginConstructors.put(
BuiltinPipePlugin.IOT_CONSENSUS_V2_ASYNC_CONNECTOR.getPipePluginName(),
IoTConsensusV2AsyncSink::new);
// Keep the pre-rename plugin name wired to the new implementation for stale PipeMeta.
pluginConstructors.put(
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName(),
IoTConsensusV2AsyncSink::new);
pluginConstructors.put(
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
IoTDBLegacyPipeSink::new);
Expand Down Expand Up @@ -97,5 +101,9 @@ protected void initConstructors() {
pluginConstructors.put(
BuiltinPipePlugin.IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName(),
IoTConsensusV2AsyncSink::new);
// Keep the pre-rename plugin name wired to the new implementation for stale PipeMeta.
pluginConstructors.put(
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName(),
IoTConsensusV2AsyncSink::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor;
import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.IoTConsensusV2AsyncSink;
import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
Expand Down Expand Up @@ -118,6 +120,20 @@ public void testPipePluginAgent() {
}
}))
.getClass());
Assert.assertEquals(
IoTConsensusV2Processor.class,
agent
.dataRegion()
.reflectProcessor(
new PipeParameters(
new HashMap<String, String>() {
{
put(
PipeProcessorConstant.PROCESSOR_KEY,
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName());
}
}))
.getClass());
Assert.assertEquals(
IoTDBDataRegionAsyncSink.class,
agent
Expand All @@ -132,5 +148,19 @@ public void testPipePluginAgent() {
}
}))
.getClass());
Assert.assertEquals(
IoTConsensusV2AsyncSink.class,
agent
.dataRegion()
.reflectSink(
new PipeParameters(
new HashMap<String, String>() {
{
put(
PipeSinkConstant.CONNECTOR_KEY,
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName());
}
}))
.getClass());
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new compatibility alias PIPE_CONSENSUS_ASYNC_SINK is added in the constructors, but the test only verifies the legacy processor and connector aliases. Consider adding an assertion that reflectSink also resolves when the sink key is set to BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName() (i.e., cover the sink alias path, not just connector).

Suggested change
.getClass());
.getClass());
Assert.assertEquals(
IoTConsensusV2AsyncSink.class,
agent
.dataRegion()
.reflectSink(
new PipeParameters(
new HashMap<String, String>() {
{
put(
PipeSinkConstant.SINK_KEY,
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName());
}
}))
.getClass());

Copilot uses AI. Check for mistakes.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public enum BuiltinPipePlugin {
STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", StandardStatisticsProcessor.class),
TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor", TumblingWindowingProcessor.class),
IOT_CONSENSUS_V2_PROCESSOR("iot-consensus-v2-processor", IoTConsensusV2Processor.class),
// Legacy alias for stale PipeMeta written before the PipeConsensus -> IoTConsensusV2 rename.
PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor", IoTConsensusV2Processor.class),
RENAME_DATABASE_PROCESSOR("rename-database-processor", RenameDatabaseProcessor.class),

// connectors
Expand All @@ -86,6 +88,8 @@ public enum BuiltinPipePlugin {
IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapSink.class),
IOT_CONSENSUS_V2_ASYNC_CONNECTOR(
"iot-consensus-v2-async-connector", IoTConsensusV2AsyncSink.class),
// Legacy alias for stale PipeMeta written before the PipeConsensus -> IoTConsensusV2 rename.
PIPE_CONSENSUS_ASYNC_CONNECTOR("pipe-consensus-async-connector", IoTConsensusV2AsyncSink.class),

WEBSOCKET_CONNECTOR("websocket-connector", WebSocketSink.class),
OPC_UA_CONNECTOR("opc-ua-connector", OpcUaSink.class),
Expand All @@ -105,6 +109,8 @@ public enum BuiltinPipePlugin {
WRITE_BACK_SINK("write-back-sink", WriteBackSink.class),
SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class),
IOT_CONSENSUS_V2_ASYNC_SINK("iot-consensus-v2-async-sink", IoTConsensusV2AsyncSink.class),
// Legacy alias for stale PipeMeta written before the PipeConsensus -> IoTConsensusV2 rename.
PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", IoTConsensusV2AsyncSink.class),
;

private final String pipePluginName;
Expand Down Expand Up @@ -158,6 +164,7 @@ public String getClassName() {
STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(),
TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(),
IOT_CONSENSUS_V2_PROCESSOR.getPipePluginName().toUpperCase(),
PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(),
RENAME_DATABASE_PROCESSOR.getPipePluginName().toUpperCase(),
// Connectors
DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(),
Expand All @@ -172,6 +179,7 @@ public String getClassName() {
OPC_DA_CONNECTOR.getPipePluginName().toUpperCase(),
WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(),
IOT_CONSENSUS_V2_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
// Sinks
IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(),
IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(),
Expand All @@ -180,5 +188,6 @@ public String getClassName() {
OPC_UA_SINK.getPipePluginName().toUpperCase(),
OPC_DA_SINK.getPipePluginName().toUpperCase(),
SUBSCRIPTION_SINK.getPipePluginName().toUpperCase(),
IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName().toUpperCase())));
IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName().toUpperCase(),
PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName().toUpperCase())));
}
Loading