diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 1eccd294b04..72d0905d6eb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -167,6 +167,12 @@ public static String getDefaultHapolicyBackupStrategy() { // how long (in ms) to wait to acquire a file lock on the journal private static long DEFAULT_JOURNAL_LOCK_ACQUISITION_TIMEOUT = -1; + // how long (in ms) between lock health checks + private static long DEFAULT_LOCK_MONITOR_TIMEOUT = 2000; + + // how many times to retry a failed lock health check before declaring the lock lost + private static int DEFAULT_LOCK_MONITOR_MAX_RETRIES = 0; + // true means that the server supports wild card routing private static boolean DEFAULT_WILDCARD_ROUTING_ENABLED = true; @@ -827,6 +833,20 @@ public static long getDefaultJournalLockAcquisitionTimeout() { return DEFAULT_JOURNAL_LOCK_ACQUISITION_TIMEOUT; } + /** + * how long (in ms) between lock health checks + */ + public static long getDefaultLockMonitorTimeout() { + return DEFAULT_LOCK_MONITOR_TIMEOUT; + } + + /** + * how many times to retry a failed lock health check before declaring the lock lost + */ + public static int getDefaultLockMonitorMaxRetries() { + return DEFAULT_LOCK_MONITOR_MAX_RETRIES; + } + /** * {@code true} means that the server supports wild card routing */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 67aa0279cc2..2c16a0b6990 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -1237,6 +1237,14 @@ Configuration addDiscoveryGroupConfiguration(String key, long getJournalLockAcquisitionTimeout(); + Configuration setLockMonitorTimeout(long lockMonitorTimeout); + + long getLockMonitorTimeout(); + + Configuration setLockMonitorMaxRetries(int lockMonitorMaxRetries); + + int getLockMonitorMaxRetries(); + HAPolicyConfiguration getHAPolicyConfiguration(); Configuration setHAPolicyConfiguration(HAPolicyConfiguration haPolicyConfiguration); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index de5ed223a6f..cd5dd421568 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -402,6 +402,10 @@ public class ConfigurationImpl extends javax.security.auth.login.Configuration i private long journalLockAcquisitionTimeout = ActiveMQDefaultConfiguration.getDefaultJournalLockAcquisitionTimeout(); + private long lockMonitorTimeout = ActiveMQDefaultConfiguration.getDefaultLockMonitorTimeout(); + + private int lockMonitorMaxRetries = ActiveMQDefaultConfiguration.getDefaultLockMonitorMaxRetries(); + private HAPolicyConfiguration haPolicyConfiguration; private StoreConfiguration storeConfiguration; @@ -3010,7 +3014,7 @@ public int hashCode() { scheduledThreadPoolMaxSize, securityEnabled, populateValidatedUser, securityInvalidationInterval, securitySettings, serverDumpInterval, threadPoolMaxSize, transactionTimeout, transactionTimeoutScanPeriod, wildcardConfiguration, resolveProtocols, - journalLockAcquisitionTimeout, connectionTtlCheckInterval); + journalLockAcquisitionTimeout, lockMonitorTimeout, lockMonitorMaxRetries, connectionTtlCheckInterval); } @Override @@ -3091,6 +3095,8 @@ public boolean equals(Object obj) { Objects.equals(wildcardConfiguration, other.wildcardConfiguration) && resolveProtocols == other.resolveProtocols && journalLockAcquisitionTimeout == other.journalLockAcquisitionTimeout && + lockMonitorTimeout == other.lockMonitorTimeout && + lockMonitorMaxRetries == other.lockMonitorMaxRetries && connectionTtlCheckInterval == other.connectionTtlCheckInterval && journalDatasync == other.journalDatasync && Objects.equals(globalMaxSize, other.globalMaxSize) && @@ -3132,6 +3138,28 @@ public long getJournalLockAcquisitionTimeout() { return journalLockAcquisitionTimeout; } + @Override + public ConfigurationImpl setLockMonitorTimeout(long lockMonitorTimeout) { + this.lockMonitorTimeout = lockMonitorTimeout; + return this; + } + + @Override + public long getLockMonitorTimeout() { + return lockMonitorTimeout; + } + + @Override + public ConfigurationImpl setLockMonitorMaxRetries(int lockMonitorMaxRetries) { + this.lockMonitorMaxRetries = lockMonitorMaxRetries; + return this; + } + + @Override + public int getLockMonitorMaxRetries() { + return lockMonitorMaxRetries; + } + @Override public HAPolicyConfiguration getHAPolicyConfiguration() { return haPolicyConfiguration; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index bd6c5ccc3ff..7153d9d65a4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -854,6 +854,10 @@ public void parseMainConfig(final Element e, final Configuration config) throws config.setJournalLockAcquisitionTimeout(getLong(e, "journal-lock-acquisition-timeout", config.getJournalLockAcquisitionTimeout(), MINUS_ONE_OR_GT_ZERO)); + config.setLockMonitorTimeout(getLong(e, "lock-monitor-timeout", config.getLockMonitorTimeout(), GT_ZERO)); + + config.setLockMonitorMaxRetries(getInteger(e, "lock-monitor-max-retries", config.getLockMonitorMaxRetries(), GE_ZERO)); + if (e.hasAttribute("wild-card-routing-enabled")) { config.setWildcardRoutingEnabled(getBoolean(e, "wild-card-routing-enabled", config.isWildcardRoutingEnabled())); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index e0d8055f958..d8806544131 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -637,12 +637,12 @@ protected NodeManager createNodeManager(final File directory, boolean replicatin } else if (haType == null || haType == HAPolicyConfiguration.TYPE.PRIMARY_ONLY) { logger.debug("Detected no Shared Store HA options on JDBC store"); //PRIMARY_ONLY should be the default HA option when HA isn't configured - manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout(), scheduledPool); + manager = new FileLockNodeManager(directory, replicatingBackup, configuration, scheduledPool); } else { throw new IllegalArgumentException("JDBC persistence allows only Shared Store HA options"); } } else { - manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout(), scheduledPool); + manager = new FileLockNodeManager(directory, replicatingBackup, configuration, scheduledPool); } return manager; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java index ee765811d08..53921c441cf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java @@ -27,8 +27,11 @@ import java.util.Date; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.server.ActivateCallback; import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException; import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; @@ -71,29 +74,35 @@ public class FileLockNodeManager extends FileBasedNodeManager { private final long lockAcquisitionTimeoutNanos; + private final long lockMonitorTimeoutNanos; + + private final int lockMonitorMaxRetries; + protected boolean interrupted = false; private final ScheduledExecutorService scheduledPool; - public FileLockNodeManager(final File directory, boolean replicatedBackup, ScheduledExecutorService scheduledPool) { - super(replicatedBackup, directory); - this.scheduledPool = scheduledPool; - this.lockAcquisitionTimeoutNanos = -1; - } - public FileLockNodeManager(final File directory, boolean replicatedBackup) { super(replicatedBackup, directory); this.scheduledPool = null; - this.lockAcquisitionTimeoutNanos = -1; + long timeout = ActiveMQDefaultConfiguration.getDefaultJournalLockAcquisitionTimeout(); + this.lockAcquisitionTimeoutNanos = timeout == -1 ? -1 : TimeUnit.MILLISECONDS.toNanos(timeout); + this.lockMonitorTimeoutNanos = TimeUnit.MILLISECONDS.toNanos( + ActiveMQDefaultConfiguration.getDefaultLockMonitorTimeout()); + this.lockMonitorMaxRetries = ActiveMQDefaultConfiguration.getDefaultLockMonitorMaxRetries(); } public FileLockNodeManager(final File directory, boolean replicatedBackup, - long lockAcquisitionTimeout, + Configuration configuration, ScheduledExecutorService scheduledPool) { super(replicatedBackup, directory); this.scheduledPool = scheduledPool; - this.lockAcquisitionTimeoutNanos = lockAcquisitionTimeout == -1 ? -1 : TimeUnit.MILLISECONDS.toNanos(lockAcquisitionTimeout); + this.lockAcquisitionTimeoutNanos = configuration.getJournalLockAcquisitionTimeout() == -1 + ? -1 + : TimeUnit.MILLISECONDS.toNanos(configuration.getJournalLockAcquisitionTimeout()); + this.lockMonitorTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(configuration.getLockMonitorTimeout()); + this.lockMonitorMaxRetries = configuration.getLockMonitorMaxRetries(); } @Override @@ -477,7 +486,7 @@ protected FileLock lock(final int lockPosition) throws ClosedChannelException, A private synchronized void startLockMonitoring() { logger.debug("Starting the lock monitor"); if (monitorLock == null) { - monitorLock = new MonitorLock(scheduledPool, LOCK_MONITOR_TIMEOUT_NANOS, LOCK_MONITOR_TIMEOUT_NANOS, TimeUnit.NANOSECONDS, false); + monitorLock = new MonitorLock(scheduledPool, lockMonitorTimeoutNanos, lockMonitorTimeoutNanos, TimeUnit.NANOSECONDS, false, lockMonitorMaxRetries); monitorLock.start(); } else { logger.debug("Lock monitor was already started"); @@ -503,7 +512,10 @@ protected synchronized void notifyLostLock() { // This has been introduced to help ByteMan test testLockMonitorInvalid on JDK 11: sun.nio.ch.FileLockImpl::isValid // can affecting setPrimary, causing an infinite loop due to java.nio.channels.OverlappingFileLockException on tryLock - private boolean isPrimaryLockLost() { + // Made protected to allow testing with controlled lock failures. + // Subclasses overriding this method should maintain the contract: return true when the primary lock is lost, + // false when the lock is healthy. The return value directly controls whether MonitorLock triggers notifyLostLock(). + protected boolean isPrimaryLockLost() { final FileLock lock = this.primaryLock; return (lock != null && !lock.isValid()) || lock == null; } @@ -511,12 +523,17 @@ private boolean isPrimaryLockLost() { private MonitorLock monitorLock; public class MonitorLock extends ActiveMQScheduledComponent { + private final int maxRetries; + private final AtomicInteger consecutiveFailures = new AtomicInteger(0); + public MonitorLock(ScheduledExecutorService scheduledExecutorService, long initialDelay, long checkPeriod, TimeUnit timeUnit, - boolean onDemand) { + boolean onDemand, + int maxRetries) { super(scheduledExecutorService, initialDelay, checkPeriod, timeUnit, onDemand); + this.maxRetries = maxRetries; } @@ -556,8 +573,19 @@ public void run() { } if (lostLock) { - logger.warn("Lost the lock according to the monitor, notifying listeners"); - notifyLostLock(); + int failures = consecutiveFailures.incrementAndGet(); + if (failures > maxRetries) { + logger.warn("Lost the lock according to the monitor after {} failed check(s), notifying listeners", failures); + notifyLostLock(); + } else { + logger.warn("Lock check failed ({}/{}), will retry on next check period", failures, maxRetries + 1); + } + } else { + // Reset the failure counter on successful check + int previousFailures = consecutiveFailures.getAndSet(0); + if (previousFailures > 0) { + logger.info("Lock check succeeded after {} previous failure(s), resetting failure counter", previousFailures); + } } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 2af3e1b1449..6115b4bff09 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -171,6 +171,22 @@ + + + + how long (in ms) between lock health checks + + + + + + + + how many times to retry a failed lock health check before declaring the lock lost (useful for transient storage outages) + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java index 6d43db8f4b7..df4adabcc8b 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java @@ -174,6 +174,8 @@ public void testDefaults() { assertEquals(ActiveMQDefaultConfiguration.getDefaultLargeMessagesDir(), conf.getLargeMessagesDirectory()); assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage(), conf.getJournalCompactPercentage()); assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalLockAcquisitionTimeout(), conf.getJournalLockAcquisitionTimeout()); + assertEquals(ActiveMQDefaultConfiguration.getDefaultLockMonitorTimeout(), conf.getLockMonitorTimeout()); + assertEquals(ActiveMQDefaultConfiguration.getDefaultLockMonitorMaxRetries(), conf.getLockMonitorMaxRetries()); assertEquals(ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, conf.getJournalBufferTimeout_AIO()); assertEquals(ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, conf.getJournalBufferTimeout_NIO()); assertEquals(ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, conf.getJournalBufferSize_AIO()); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java index 8701f6cc31f..1f56296deb0 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java @@ -110,6 +110,10 @@ public void testDefaults() { assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalLockAcquisitionTimeout(), conf.getJournalLockAcquisitionTimeout()); + assertEquals(ActiveMQDefaultConfiguration.getDefaultLockMonitorTimeout(), conf.getLockMonitorTimeout()); + + assertEquals(ActiveMQDefaultConfiguration.getDefaultLockMonitorMaxRetries(), conf.getLockMonitorMaxRetries()); + assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalMinFiles(), conf.getJournalMinFiles()); assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio(), conf.getJournalMaxIO_AIO()); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManagerMockRecoveryTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManagerMockRecoveryTest.java new file mode 100644 index 00000000000..dc3c515326f --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManagerMockRecoveryTest.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import java.io.File; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.core.server.ActivateCallback; +import org.apache.activemq.artemis.tests.extensions.TargetTempDirFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.io.TempDir; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Mock-based test demonstrating the transient failure recovery scenario. + * This simulates the CephFS MDS failover use case where the filesystem + * temporarily becomes unavailable but recovers within the retry window. + */ +public class FileLockNodeManagerMockRecoveryTest { + + @TempDir(factory = TargetTempDirFactory.class) + public File temporaryFolder; + + private ScheduledExecutorService scheduledPool; + + @BeforeEach + public void setup() { + scheduledPool = Executors.newScheduledThreadPool(2); + } + + @AfterEach + public void teardown() { + if (scheduledPool != null) { + scheduledPool.shutdownNow(); + } + } + + /** + * Testable FileLockNodeManager that allows us to inject controlled lock check failures. + * This simulates the CephFS MDS failover scenario: + * - Check 1: Fail (MDS is failing over) + * - Check 2: Fail (MDS still failing over) + * - Check 3: Success (MDS recovered) + * - Result: No lock lost notification because recovery happened within retry window + */ + static class MockableFileLockNodeManager extends FileLockNodeManager { + private final Queue lockHealthResults = new ConcurrentLinkedQueue<>(); + private final AtomicInteger checkCount = new AtomicInteger(0); + private volatile Boolean lastResult = null; + + MockableFileLockNodeManager(File directory, + boolean replicatedBackup, + long lockAcquisitionTimeout, + long lockMonitorTimeout, + int lockMonitorMaxRetries, + ScheduledExecutorService scheduledPool) { + super(directory, replicatedBackup, + TestConfigurationBuilder.forLockTesting(lockAcquisitionTimeout, lockMonitorTimeout, lockMonitorMaxRetries), + scheduledPool); + } + + /** + * Queue a sequence of lock health check results. + * true = healthy, false = failed + */ + public void queueLockHealthResults(Boolean... results) { + for (Boolean result : results) { + lockHealthResults.offer(result); + } + } + + /** + * Override the primary lock lost check to return our controlled results. + * This simulates transient storage failures. + * After queue is exhausted, continues returning the last queued result. + */ + @Override + protected boolean isPrimaryLockLost() { + checkCount.incrementAndGet(); + Boolean nextResult = lockHealthResults.poll(); + if (nextResult != null) { + lastResult = nextResult; + } + // If we have a result (from queue or last), use it + if (lastResult != null) { + // Return inverted because isPrimaryLockLost() returns true when lock IS lost + return !lastResult; + } + // If no results ever queued, use real check + return super.isPrimaryLockLost(); + } + + public int getCheckCount() { + return checkCount.get(); + } + } + + /** + * Test successful recovery scenario: + * - System configured with 2 retries (tolerates 3 consecutive failures) + * - First 2 checks fail (simulating MDS failover in progress) + * - Third check succeeds (MDS recovered) + * - Result: No lock lost notification, system continues running + */ + @Test + @Timeout(10) + public void testTransientFailureThenRecovery() throws Exception { + MockableFileLockNodeManager manager = new MockableFileLockNodeManager( + temporaryFolder, false, -1, 200, 2, scheduledPool); + + // Simulate CephFS MDS failover scenario: + // Fail, Fail, Success (recovery within retry window) + manager.queueLockHealthResults(false, false, true); + + manager.start(); + + AtomicInteger lostLockCallCount = new AtomicInteger(0); + CountDownLatch lostLockLatch = new CountDownLatch(1); + + manager.registerLockListener(() -> { + lostLockCallCount.incrementAndGet(); + lostLockLatch.countDown(); + }); + + ActivateCallback callback = manager.startPrimaryNode(); + callback.activationComplete(); + + // Wait enough time for 3 checks to occur + Thread.sleep(800); + + // Verify lock was NOT lost because it recovered within the retry window + assertEquals(0, lostLockCallCount.get(), + "Lock should not be declared lost - it recovered within retry window"); + + assertTrue(manager.getCheckCount() >= 3, + "Should have performed at least 3 health checks"); + + manager.stop(); + } + + /** + * Test permanent failure scenario: + * - System configured with 2 retries + * - All checks fail (storage is truly gone) + * - Result: Lock lost notification after retries exhausted + */ + @Test + @Timeout(10) + public void testPermanentFailure() throws Exception { + MockableFileLockNodeManager manager = new MockableFileLockNodeManager( + temporaryFolder, false, -1, 200, 2, scheduledPool); + + // Simulate permanent failure: all checks fail + manager.queueLockHealthResults(false, false, false, false); + + manager.start(); + + AtomicInteger lostLockCallCount = new AtomicInteger(0); + CountDownLatch lostLockLatch = new CountDownLatch(1); + + manager.registerLockListener(() -> { + lostLockCallCount.incrementAndGet(); + lostLockLatch.countDown(); + }); + + ActivateCallback callback = manager.startPrimaryNode(); + callback.activationComplete(); + + // Wait for lock to be declared lost + boolean notified = lostLockLatch.await(2, TimeUnit.SECONDS); + + assertTrue(notified, "Lock should be declared lost after retries exhausted"); + assertEquals(1, lostLockCallCount.get(), + "Lost lock listener should be called exactly once"); + + manager.stop(); + } + + /** + * Test intermittent failures with eventual recovery: + * - Fail, Success, Fail, Success pattern + * - Counter should reset on each success + * - Should never accumulate enough consecutive failures to trigger notification + */ + @Test + @Timeout(10) + public void testIntermittentFailuresWithRecovery() throws Exception { + MockableFileLockNodeManager manager = new MockableFileLockNodeManager( + temporaryFolder, false, -1, 200, 1, scheduledPool); + + // Intermittent failures: Fail, Success, Fail, Success, Fail, Success + // With 1 retry (max 2 consecutive), should never trigger lost lock + manager.queueLockHealthResults(false, true, false, true, false, true); + + manager.start(); + + AtomicInteger lostLockCallCount = new AtomicInteger(0); + + manager.registerLockListener(() -> { + lostLockCallCount.incrementAndGet(); + }); + + ActivateCallback callback = manager.startPrimaryNode(); + callback.activationComplete(); + + // Wait for all checks to complete + Thread.sleep(1500); + + // Should never lose lock because consecutive failures never exceed maxRetries + assertEquals(0, lostLockCallCount.get(), + "Lock should not be lost with intermittent failures and recovery"); + + manager.stop(); + } + + /** + * Test edge case: exactly maxRetries+1 consecutive failures + * Should trigger notification on the last failure + */ + @Test + @Timeout(10) + public void testExactThresholdFailures() throws Exception { + MockableFileLockNodeManager manager = new MockableFileLockNodeManager( + temporaryFolder, false, -1, 200, 2, scheduledPool); + + // Exactly 3 failures (maxRetries=2, so 2+1=3 triggers notification) + manager.queueLockHealthResults(false, false, false); + + manager.start(); + + AtomicInteger lostLockCallCount = new AtomicInteger(0); + CountDownLatch lostLockLatch = new CountDownLatch(1); + + manager.registerLockListener(() -> { + lostLockCallCount.incrementAndGet(); + lostLockLatch.countDown(); + }); + + ActivateCallback callback = manager.startPrimaryNode(); + callback.activationComplete(); + + // Wait for notification + boolean notified = lostLockLatch.await(2, TimeUnit.SECONDS); + + assertTrue(notified, "Should notify exactly when threshold is reached"); + assertEquals(1, lostLockCallCount.get()); + + manager.stop(); + } +} diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManagerRetryTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManagerRetryTest.java new file mode 100644 index 00000000000..dbe2556cad4 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManagerRetryTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import java.io.File; +import java.lang.reflect.Field; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.tests.extensions.TargetTempDirFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.io.TempDir; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Comprehensive tests for lock monitor retry mechanism. + * Tests the specific retry behavior introduced for ENTMQBR-10718. + */ +public class FileLockNodeManagerRetryTest { + + @TempDir(factory = TargetTempDirFactory.class) + public File temporaryFolder; + + private ScheduledExecutorService scheduledPool; + + @BeforeEach + public void setup() { + scheduledPool = Executors.newScheduledThreadPool(2); + } + + @AfterEach + public void teardown() { + if (scheduledPool != null) { + scheduledPool.shutdownNow(); + } + } + + /** + * Test that with 0 retries (default), the first failure triggers notifyLostLock + */ + @Test + @Timeout(10) + public void testNoRetriesFailsImmediately() throws Exception { + Configuration config = TestConfigurationBuilder.forLockTesting(-1, 200, 0); + FileLockNodeManager manager = new FileLockNodeManager(temporaryFolder, false, config, scheduledPool); + manager.start(); + + AtomicInteger lostLockCallCount = new AtomicInteger(0); + CountDownLatch lostLockLatch = new CountDownLatch(1); + + manager.registerLockListener(() -> { + lostLockCallCount.incrementAndGet(); + lostLockLatch.countDown(); + }); + + manager.startPrimaryNode().activationComplete(); + assertTrue(manager.isPrimaryLocked()); + + // Close channel to simulate failure + manager.getChannel().close(); + + // Should fail on first check (no retries) + boolean notified = lostLockLatch.await(5, TimeUnit.SECONDS); + assertTrue(notified, "Should notify lost lock on first failure with 0 retries"); + assertTrue(lostLockCallCount.get() >= 1, "Should have at least one notification"); + + manager.stop(); + } + + /** + * Test that with 1 retry, it takes 2 consecutive failures before notifying + */ + @Test + @Timeout(10) + public void testOneRetryRequiresTwoFailures() throws Exception { + Configuration config = TestConfigurationBuilder.forLockTesting(-1, 200, 1); + FileLockNodeManager manager = new FileLockNodeManager(temporaryFolder, false, config, scheduledPool); + manager.start(); + + AtomicInteger lostLockCallCount = new AtomicInteger(0); + CountDownLatch lostLockLatch = new CountDownLatch(1); + + manager.registerLockListener(() -> { + lostLockCallCount.incrementAndGet(); + lostLockLatch.countDown(); + }); + + manager.startPrimaryNode().activationComplete(); + assertTrue(manager.isPrimaryLocked()); + + // Close channel - should fail but not notify yet (1 retry available) + manager.getChannel().close(); + + // Wait enough time for 2 check periods (first fail + retry) + boolean notified = lostLockLatch.await(1, TimeUnit.SECONDS); + assertTrue(notified, "Should notify after retry exhausted (2 failures total)"); + assertTrue(lostLockCallCount.get() >= 1, "Should have at least one notification"); + + manager.stop(); + } + + /** + * Test recovery scenario: transient failure followed by recovery + * This simulates the CephFS MDS failover use case + */ + @Test + @Timeout(15) + public void testTransientFailureRecovery() throws Exception { + // Use slightly longer period to control timing better + Configuration config = TestConfigurationBuilder.forLockTesting(-1, 300, 2); + FileLockNodeManager manager = new FileLockNodeManager(temporaryFolder, false, config, scheduledPool); + manager.start(); + + AtomicInteger lostLockCallCount = new AtomicInteger(0); + CountDownLatch lostLockLatch = new CountDownLatch(1); + + manager.registerLockListener(() -> { + lostLockCallCount.incrementAndGet(); + lostLockLatch.countDown(); + }); + + manager.startPrimaryNode().activationComplete(); + assertTrue(manager.isPrimaryLocked()); + + // Simulate transient failure: close channel briefly + manager.getChannel().close(); + + // Wait for 1-2 failed checks to occur + Thread.sleep(500); + + // Verify lock was NOT declared lost yet (we have 2 retries) + assertEquals(0, lostLockCallCount.get(), "Should not have lost lock yet - still have retries"); + + // Note: In a real scenario, the filesystem would recover here + // In this test, we can't easily recover the channel, so we verify + // the behavior up to this point (retries prevented immediate failure) + + manager.stop(); + } + + /** + * Test with higher retry count + */ + @Test + @Timeout(15) + public void testMultipleRetries() throws Exception { + Configuration config = TestConfigurationBuilder.forLockTesting(-1, 150, 3); + FileLockNodeManager manager = new FileLockNodeManager(temporaryFolder, false, config, scheduledPool); + manager.start(); + + AtomicInteger lostLockCallCount = new AtomicInteger(0); + CountDownLatch lostLockLatch = new CountDownLatch(1); + + manager.registerLockListener(() -> { + lostLockCallCount.incrementAndGet(); + lostLockLatch.countDown(); + }); + + manager.startPrimaryNode().activationComplete(); + + // Close channel to cause failures + manager.getChannel().close(); + + // With 3 retries, should take 4 consecutive failures + // At 150ms period, that's ~600ms minimum + boolean notified = lostLockLatch.await(2, TimeUnit.SECONDS); + assertTrue(notified, "Should eventually notify after all retries exhausted"); + assertTrue(lostLockCallCount.get() >= 1, "Should have at least one notification"); + + manager.stop(); + } + + /** + * Test accessing the MonitorLock internal state via reflection + * This allows us to verify the consecutiveFailures counter behavior + */ + @Test + @Timeout(10) + public void testConsecutiveFailuresCounterViaReflection() throws Exception { + Configuration config = TestConfigurationBuilder.forLockTesting(-1, 200, 2); + FileLockNodeManager manager = new FileLockNodeManager(temporaryFolder, false, config, scheduledPool); + manager.start(); + manager.startPrimaryNode().activationComplete(); + + // Get the MonitorLock instance via reflection + Field monitorLockField = FileLockNodeManager.class.getDeclaredField("monitorLock"); + monitorLockField.setAccessible(true); + Object monitorLock = monitorLockField.get(manager); + + // Get the consecutiveFailures field (now an AtomicInteger) + Field consecutiveFailuresField = monitorLock.getClass().getDeclaredField("consecutiveFailures"); + consecutiveFailuresField.setAccessible(true); + java.util.concurrent.atomic.AtomicInteger consecutiveFailures = + (java.util.concurrent.atomic.AtomicInteger) consecutiveFailuresField.get(monitorLock); + + // Initially should be 0 + assertEquals(0, consecutiveFailures.get(), "Should start with 0 failures"); + + // Close channel to cause failures + manager.getChannel().close(); + + // Wait for a check to occur + Thread.sleep(400); + + // Should have incremented the failure counter + int failures = consecutiveFailures.get(); + assertTrue(failures > 0, "Should have recorded at least one failure"); + assertTrue(failures <= 3, "Should not exceed maxRetries + 1"); + + manager.stop(); + } + + /** + * Test that configuration values are properly stored + */ + @Test + public void testConfigurationValuesStored() throws Exception { + Configuration config = TestConfigurationBuilder.forLockTesting(-1, 5000, 5); + FileLockNodeManager manager = new FileLockNodeManager(temporaryFolder, false, config, scheduledPool); + + // Access private fields via reflection + Field timeoutField = FileLockNodeManager.class.getDeclaredField("lockMonitorTimeoutNanos"); + timeoutField.setAccessible(true); + long timeout = (long) timeoutField.get(manager); + assertEquals(TimeUnit.MILLISECONDS.toNanos(5000), timeout, "Lock monitor timeout should be stored in nanos"); + + Field retriesField = FileLockNodeManager.class.getDeclaredField("lockMonitorMaxRetries"); + retriesField.setAccessible(true); + int retries = (int) retriesField.get(manager); + assertEquals(5, retries, "Max retries should be stored correctly"); + } +} diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManagerTest.java index c49cd6de6c0..29788e727f5 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManagerTest.java @@ -18,13 +18,21 @@ import java.io.File; import java.nio.channels.ClosedChannelException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.tests.extensions.TargetTempDirFactory; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class FileLockNodeManagerTest { @@ -44,4 +52,46 @@ public void testChannelClosed() throws Exception { assertThrows(ClosedChannelException.class, () -> manager.lock(0)); } + + @Test + @Timeout(10) + public void testLockMonitorWithRetries() throws Exception { + ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1); + try { + // Create a FileLockNodeManager with custom lock monitor settings: + // - 200ms check period + // - 2 retries (so it should tolerate up to 2 consecutive failures) + Configuration config = TestConfigurationBuilder.forLockTesting(-1, 200, 2); + FileLockNodeManager manager = new FileLockNodeManager(temporaryFolder, false, config, scheduledPool); + manager.start(); + + AtomicInteger lostLockCallCount = new AtomicInteger(0); + CountDownLatch lostLockLatch = new CountDownLatch(1); + + // Register a listener to track when notifyLostLock is called + manager.registerLockListener(() -> { + lostLockCallCount.incrementAndGet(); + lostLockLatch.countDown(); + }); + + // Start as primary to begin lock monitoring + manager.startPrimaryNode().activationComplete(); + + // Verify the manager is in primary mode + assertTrue(manager.isPrimaryLocked()); + + // Close the channel to simulate a lock failure + // With 2 retries, it should fail 3 times total before calling notifyLostLock + manager.getChannel().close(); + + // Wait for the lock to be declared lost (should take ~600-800ms: 3 check periods) + boolean lostLockNotified = lostLockLatch.await(5, TimeUnit.SECONDS); + assertTrue(lostLockNotified, "Lost lock should have been notified after max retries exceeded"); + assertEquals(1, lostLockCallCount.get(), "Lost lock listener should be called exactly once"); + + manager.stop(); + } finally { + scheduledPool.shutdownNow(); + } + } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/TestConfigurationBuilder.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/TestConfigurationBuilder.java new file mode 100644 index 00000000000..c626c13b709 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/TestConfigurationBuilder.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.config.Configuration; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Helper for creating minimal Configuration mocks for FileLockNodeManager tests. + * This allows tests to use the production Configuration-based constructor + * without needing a full Configuration object. + */ +class TestConfigurationBuilder { + + /** + * Create a minimal Configuration mock with only lock-related settings. + * + * @param lockAcquisitionTimeout journal lock acquisition timeout in milliseconds + * @param lockMonitorTimeout lock monitor check period in milliseconds + * @param lockMonitorMaxRetries maximum retries for failed lock checks + * @return Configuration mock configured for lock testing + */ + static Configuration forLockTesting(long lockAcquisitionTimeout, + long lockMonitorTimeout, + int lockMonitorMaxRetries) { + Configuration config = mock(Configuration.class); + when(config.getJournalLockAcquisitionTimeout()).thenReturn(lockAcquisitionTimeout); + when(config.getLockMonitorTimeout()).thenReturn(lockMonitorTimeout); + when(config.getLockMonitorMaxRetries()).thenReturn(lockMonitorMaxRetries); + return config; + } +} diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/ColocatedActiveMQServer.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/ColocatedActiveMQServer.java index 8df04a79221..4659127e161 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/ColocatedActiveMQServer.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/ColocatedActiveMQServer.java @@ -65,7 +65,7 @@ public ColocatedActiveMQServer(Configuration configuration, @Override protected NodeManager createNodeManager(final File directory, boolean replicatingBackup) { if (replicatingBackup) { - return new FileLockNodeManager(directory, replicatingBackup, getConfiguration().getJournalLockAcquisitionTimeout(), null); + return new FileLockNodeManager(directory, replicatingBackup, getConfiguration(), null); } else { if (backup) { return nodeManagerBackup;