diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java index 529f15d06cd44..a3ee036f6ab05 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.exception.BadNodeUrlException; import org.apache.iotdb.commons.file.SystemPropertiesHandler; import org.apache.iotdb.commons.utils.NodeUrlUtils; +import org.apache.iotdb.consensus.ConsensusFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,10 +125,15 @@ public static void checkSystemProperties() throws IOException { } } + // Only the data region protocol could have been persisted as the old PipeConsensus name + // during a jar-only upgrade, so only that field needs compatibility normalization. // Consensus protocol configuration + boolean needRewriteConsensusProtocol = false; + String configNodeConsensusProtocolClass = systemProperties.getProperty(CN_CONSENSUS_PROTOCOL, null); - if (!configNodeConsensusProtocolClass.equals(conf.getConfigNodeConsensusProtocolClass())) { + if (!Objects.equals( + configNodeConsensusProtocolClass, conf.getConfigNodeConsensusProtocolClass())) { LOGGER.warn( format, CN_CONSENSUS_PROTOCOL, @@ -136,9 +142,22 @@ public static void checkSystemProperties() throws IOException { conf.setConfigNodeConsensusProtocolClass(configNodeConsensusProtocolClass); } - String dataRegionConsensusProtocolClass = + String persistedDataRegionConsensusProtocolClass = systemProperties.getProperty(DATA_CONSENSUS_PROTOCOL, null); - if (!dataRegionConsensusProtocolClass.equals(conf.getDataRegionConsensusProtocolClass())) { + String dataRegionConsensusProtocolClass = + ConsensusFactory.normalizeConsensusProtocolClass(persistedDataRegionConsensusProtocolClass); + if (!Objects.equals( + persistedDataRegionConsensusProtocolClass, dataRegionConsensusProtocolClass)) { + systemProperties.setProperty(DATA_CONSENSUS_PROTOCOL, dataRegionConsensusProtocolClass); + needRewriteConsensusProtocol = true; + LOGGER.warn( + "[SystemProperties] Normalize {} from {} to {} for compatibility.", + DATA_CONSENSUS_PROTOCOL, + persistedDataRegionConsensusProtocolClass, + dataRegionConsensusProtocolClass); + } + if (!Objects.equals( + dataRegionConsensusProtocolClass, conf.getDataRegionConsensusProtocolClass())) { LOGGER.warn( format, DATA_CONSENSUS_PROTOCOL, @@ -149,7 +168,8 @@ public static void checkSystemProperties() throws IOException { String schemaRegionConsensusProtocolClass = systemProperties.getProperty(SCHEMA_CONSENSUS_PROTOCOL, null); - if (!schemaRegionConsensusProtocolClass.equals(conf.getSchemaRegionConsensusProtocolClass())) { + if (!Objects.equals( + schemaRegionConsensusProtocolClass, conf.getSchemaRegionConsensusProtocolClass())) { LOGGER.warn( format, SCHEMA_CONSENSUS_PROTOCOL, @@ -157,6 +177,9 @@ public static void checkSystemProperties() throws IOException { schemaRegionConsensusProtocolClass); conf.setSchemaRegionConsensusProtocolClass(schemaRegionConsensusProtocolClass); } + if (needRewriteConsensusProtocol) { + systemPropertiesHandler.overwrite(systemProperties); + } // PartitionSlot configuration if (systemProperties.getProperty(SERIES_PARTITION_SLOT_NUM, null) != null) { @@ -265,7 +288,9 @@ public static void storeSystemParameters() throws IOException { // Consensus protocol configuration systemProperties.setProperty(CN_CONSENSUS_PROTOCOL, conf.getConfigNodeConsensusProtocolClass()); systemProperties.setProperty( - DATA_CONSENSUS_PROTOCOL, conf.getDataRegionConsensusProtocolClass()); + DATA_CONSENSUS_PROTOCOL, + ConsensusFactory.normalizeConsensusProtocolClass( + conf.getDataRegionConsensusProtocolClass())); systemProperties.setProperty( SCHEMA_CONSENSUS_PROTOCOL, conf.getSchemaRegionConsensusProtocolClass()); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java index 896a877e3e776..d24955aba7396 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java @@ -37,6 +37,10 @@ public class ConsensusFactory { public static final String SIMPLE_CONSENSUS = "org.apache.iotdb.consensus.simple.SimpleConsensus"; public static final String RATIS_CONSENSUS = "org.apache.iotdb.consensus.ratis.RatisConsensus"; public static final String IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.IoTConsensus"; + // Keep the pre-rename class name for stale system properties / snapshots restored after a + // jar-only upgrade. + public static final String LEGACY_IOT_CONSENSUS_V2 = + "org.apache.iotdb.consensus.pipe.PipeConsensus"; public static final String REAL_IOT_CONSENSUS_V2 = "org.apache.iotdb.consensus.pipe.IoTConsensusV2"; public static final String IOT_CONSENSUS_V2 = "org.apache.iotdb.consensus.iot.IoTConsensusV2"; @@ -49,11 +53,24 @@ private ConsensusFactory() { throw new IllegalStateException("Utility class ConsensusFactory"); } + // Downstream code compares against IOT_CONSENSUS_V2 directly, so persisted legacy names must be + // normalized to the canonical constant before they fan out. + public static String normalizeConsensusProtocolClass(String className) { + if (className == null) { + return null; + } + if (LEGACY_IOT_CONSENSUS_V2.equals(className) || REAL_IOT_CONSENSUS_V2.equals(className)) { + return IOT_CONSENSUS_V2; + } + return className; + } + public static Optional getConsensusImpl( String className, ConsensusConfig config, IStateMachine.Registry registry) { try { + className = normalizeConsensusProtocolClass(className); // special judge for IoTConsensusV2 - if (className.equals(IOT_CONSENSUS_V2)) { + if (IOT_CONSENSUS_V2.equals(className)) { className = REAL_IOT_CONSENSUS_V2; // initialize iotConsensusV2's thrift component IoTV2GlobalComponentContainer.build(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java index e2aa1950af6a5..d85444a56d454 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java @@ -282,13 +282,33 @@ private void checkImmutableSystemProperties() throws IOException { if (properties.containsKey(CLUSTER_ID)) { config.setClusterId(properties.getProperty(CLUSTER_ID)); } + // Only the data region protocol could have been persisted as the old PipeConsensus name + // during a jar-only upgrade, so only that field needs compatibility normalization. + boolean needRewriteConsensusProtocol = false; if (properties.containsKey(SCHEMA_REGION_CONSENSUS_PROTOCOL)) { config.setSchemaRegionConsensusProtocolClass( properties.getProperty(SCHEMA_REGION_CONSENSUS_PROTOCOL)); } if (properties.containsKey(DATA_REGION_CONSENSUS_PROTOCOL)) { - config.setDataRegionConsensusProtocolClass( - properties.getProperty(DATA_REGION_CONSENSUS_PROTOCOL)); + final String persistedDataRegionConsensusProtocolClass = + properties.getProperty(DATA_REGION_CONSENSUS_PROTOCOL); + final String dataRegionConsensusProtocolClass = + ConsensusFactory.normalizeConsensusProtocolClass( + persistedDataRegionConsensusProtocolClass); + if (!Objects.equals( + persistedDataRegionConsensusProtocolClass, dataRegionConsensusProtocolClass)) { + properties.setProperty(DATA_REGION_CONSENSUS_PROTOCOL, dataRegionConsensusProtocolClass); + needRewriteConsensusProtocol = true; + logger.warn( + "[SystemProperties] Normalize {} from {} to {} for compatibility.", + DATA_REGION_CONSENSUS_PROTOCOL, + persistedDataRegionConsensusProtocolClass, + dataRegionConsensusProtocolClass); + } + config.setDataRegionConsensusProtocolClass(dataRegionConsensusProtocolClass); + } + if (needRewriteConsensusProtocol) { + systemPropertiesHandler.overwrite(properties); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java index 5bd2ef17509d8..a6160e6b90088 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java @@ -69,6 +69,10 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOT_CONSENSUS_V2_PROCESSOR.getPipePluginName(), IoTConsensusV2Processor::new); + // Keep the pre-rename plugin name wired to the new implementation for stale PipeMeta. + pluginConstructors.put( + BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(), + IoTConsensusV2Processor::new); pluginConstructors.put( BuiltinPipePlugin.RENAME_DATABASE_PROCESSOR.getPipePluginName(), RenameDatabaseProcessor::new); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java index cbd77ebc9025e..c8e87890afd69 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java @@ -56,6 +56,10 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOT_CONSENSUS_V2_ASYNC_CONNECTOR.getPipePluginName(), IoTConsensusV2AsyncSink::new); + // Keep the pre-rename plugin name wired to the new implementation for stale PipeMeta. + pluginConstructors.put( + BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName(), + IoTConsensusV2AsyncSink::new); pluginConstructors.put( BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(), IoTDBLegacyPipeSink::new); @@ -97,5 +101,9 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName(), IoTConsensusV2AsyncSink::new); + // Keep the pre-rename plugin name wired to the new implementation for stale PipeMeta. + pluginConstructors.put( + BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName(), + IoTConsensusV2AsyncSink::new); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java index 5d4d6a640b668..b0f0b34e92a51 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java @@ -26,6 +26,8 @@ import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; +import org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor; +import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.IoTConsensusV2AsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -118,6 +120,20 @@ public void testPipePluginAgent() { } })) .getClass()); + Assert.assertEquals( + IoTConsensusV2Processor.class, + agent + .dataRegion() + .reflectProcessor( + new PipeParameters( + new HashMap() { + { + put( + PipeProcessorConstant.PROCESSOR_KEY, + BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName()); + } + })) + .getClass()); Assert.assertEquals( IoTDBDataRegionAsyncSink.class, agent @@ -132,5 +148,19 @@ public void testPipePluginAgent() { } })) .getClass()); + Assert.assertEquals( + IoTConsensusV2AsyncSink.class, + agent + .dataRegion() + .reflectSink( + new PipeParameters( + new HashMap() { + { + put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName()); + } + })) + .getClass()); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java index e95ddb61e1a2c..76b45d1e79895 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java @@ -74,6 +74,8 @@ public enum BuiltinPipePlugin { STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", StandardStatisticsProcessor.class), TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor", TumblingWindowingProcessor.class), IOT_CONSENSUS_V2_PROCESSOR("iot-consensus-v2-processor", IoTConsensusV2Processor.class), + // Legacy alias for stale PipeMeta written before the PipeConsensus -> IoTConsensusV2 rename. + PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor", IoTConsensusV2Processor.class), RENAME_DATABASE_PROCESSOR("rename-database-processor", RenameDatabaseProcessor.class), // connectors @@ -86,6 +88,8 @@ public enum BuiltinPipePlugin { IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapSink.class), IOT_CONSENSUS_V2_ASYNC_CONNECTOR( "iot-consensus-v2-async-connector", IoTConsensusV2AsyncSink.class), + // Legacy alias for stale PipeMeta written before the PipeConsensus -> IoTConsensusV2 rename. + PIPE_CONSENSUS_ASYNC_CONNECTOR("pipe-consensus-async-connector", IoTConsensusV2AsyncSink.class), WEBSOCKET_CONNECTOR("websocket-connector", WebSocketSink.class), OPC_UA_CONNECTOR("opc-ua-connector", OpcUaSink.class), @@ -105,6 +109,8 @@ public enum BuiltinPipePlugin { WRITE_BACK_SINK("write-back-sink", WriteBackSink.class), SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class), IOT_CONSENSUS_V2_ASYNC_SINK("iot-consensus-v2-async-sink", IoTConsensusV2AsyncSink.class), + // Legacy alias for stale PipeMeta written before the PipeConsensus -> IoTConsensusV2 rename. + PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", IoTConsensusV2AsyncSink.class), ; private final String pipePluginName; @@ -158,6 +164,7 @@ public String getClassName() { STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(), TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(), IOT_CONSENSUS_V2_PROCESSOR.getPipePluginName().toUpperCase(), + PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(), RENAME_DATABASE_PROCESSOR.getPipePluginName().toUpperCase(), // Connectors DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(), @@ -172,6 +179,7 @@ public String getClassName() { OPC_DA_CONNECTOR.getPipePluginName().toUpperCase(), WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(), IOT_CONSENSUS_V2_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), + PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), // Sinks IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(), IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(), @@ -180,5 +188,6 @@ public String getClassName() { OPC_UA_SINK.getPipePluginName().toUpperCase(), OPC_DA_SINK.getPipePluginName().toUpperCase(), SUBSCRIPTION_SINK.getPipePluginName().toUpperCase(), - IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName().toUpperCase()))); + IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName().toUpperCase(), + PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName().toUpperCase()))); }