Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ public static String getDefaultHapolicyBackupStrategy() {
// how long (in ms) to wait to acquire a file lock on the journal
private static long DEFAULT_JOURNAL_LOCK_ACQUISITION_TIMEOUT = -1;

// how long (in ms) between lock health checks
private static long DEFAULT_LOCK_MONITOR_TIMEOUT = 2000;

// how many times to retry a failed lock health check before declaring the lock lost
private static int DEFAULT_LOCK_MONITOR_MAX_RETRIES = 0;

// true means that the server supports wild card routing
private static boolean DEFAULT_WILDCARD_ROUTING_ENABLED = true;

Expand Down Expand Up @@ -827,6 +833,20 @@ public static long getDefaultJournalLockAcquisitionTimeout() {
return DEFAULT_JOURNAL_LOCK_ACQUISITION_TIMEOUT;
}

/**
* how long (in ms) between lock health checks
*/
public static long getDefaultLockMonitorTimeout() {
return DEFAULT_LOCK_MONITOR_TIMEOUT;
}

/**
* how many times to retry a failed lock health check before declaring the lock lost
*/
public static int getDefaultLockMonitorMaxRetries() {
return DEFAULT_LOCK_MONITOR_MAX_RETRIES;
}

/**
* {@code true} means that the server supports wild card routing
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,14 @@ Configuration addDiscoveryGroupConfiguration(String key,

long getJournalLockAcquisitionTimeout();

Configuration setLockMonitorTimeout(long lockMonitorTimeout);

long getLockMonitorTimeout();

Configuration setLockMonitorMaxRetries(int lockMonitorMaxRetries);

int getLockMonitorMaxRetries();

HAPolicyConfiguration getHAPolicyConfiguration();

Configuration setHAPolicyConfiguration(HAPolicyConfiguration haPolicyConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,10 @@ public class ConfigurationImpl extends javax.security.auth.login.Configuration i

private long journalLockAcquisitionTimeout = ActiveMQDefaultConfiguration.getDefaultJournalLockAcquisitionTimeout();

private long lockMonitorTimeout = ActiveMQDefaultConfiguration.getDefaultLockMonitorTimeout();

private int lockMonitorMaxRetries = ActiveMQDefaultConfiguration.getDefaultLockMonitorMaxRetries();

private HAPolicyConfiguration haPolicyConfiguration;

private StoreConfiguration storeConfiguration;
Expand Down Expand Up @@ -3010,7 +3014,7 @@ public int hashCode() {
scheduledThreadPoolMaxSize, securityEnabled, populateValidatedUser,
securityInvalidationInterval, securitySettings, serverDumpInterval, threadPoolMaxSize,
transactionTimeout, transactionTimeoutScanPeriod, wildcardConfiguration, resolveProtocols,
journalLockAcquisitionTimeout, connectionTtlCheckInterval);
journalLockAcquisitionTimeout, lockMonitorTimeout, lockMonitorMaxRetries, connectionTtlCheckInterval);
}

@Override
Expand Down Expand Up @@ -3091,6 +3095,8 @@ public boolean equals(Object obj) {
Objects.equals(wildcardConfiguration, other.wildcardConfiguration) &&
resolveProtocols == other.resolveProtocols &&
journalLockAcquisitionTimeout == other.journalLockAcquisitionTimeout &&
lockMonitorTimeout == other.lockMonitorTimeout &&
lockMonitorMaxRetries == other.lockMonitorMaxRetries &&
connectionTtlCheckInterval == other.connectionTtlCheckInterval &&
journalDatasync == other.journalDatasync &&
Objects.equals(globalMaxSize, other.globalMaxSize) &&
Expand Down Expand Up @@ -3132,6 +3138,28 @@ public long getJournalLockAcquisitionTimeout() {
return journalLockAcquisitionTimeout;
}

@Override
public ConfigurationImpl setLockMonitorTimeout(long lockMonitorTimeout) {
this.lockMonitorTimeout = lockMonitorTimeout;
return this;
}

@Override
public long getLockMonitorTimeout() {
return lockMonitorTimeout;
}

@Override
public ConfigurationImpl setLockMonitorMaxRetries(int lockMonitorMaxRetries) {
this.lockMonitorMaxRetries = lockMonitorMaxRetries;
return this;
}

@Override
public int getLockMonitorMaxRetries() {
return lockMonitorMaxRetries;
}

@Override
public HAPolicyConfiguration getHAPolicyConfiguration() {
return haPolicyConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,10 @@ public void parseMainConfig(final Element e, final Configuration config) throws

config.setJournalLockAcquisitionTimeout(getLong(e, "journal-lock-acquisition-timeout", config.getJournalLockAcquisitionTimeout(), MINUS_ONE_OR_GT_ZERO));

config.setLockMonitorTimeout(getLong(e, "lock-monitor-timeout", config.getLockMonitorTimeout(), GT_ZERO));

config.setLockMonitorMaxRetries(getInteger(e, "lock-monitor-max-retries", config.getLockMonitorMaxRetries(), GE_ZERO));

if (e.hasAttribute("wild-card-routing-enabled")) {
config.setWildcardRoutingEnabled(getBoolean(e, "wild-card-routing-enabled", config.isWildcardRoutingEnabled()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,12 +637,12 @@ protected NodeManager createNodeManager(final File directory, boolean replicatin
} else if (haType == null || haType == HAPolicyConfiguration.TYPE.PRIMARY_ONLY) {
logger.debug("Detected no Shared Store HA options on JDBC store");
//PRIMARY_ONLY should be the default HA option when HA isn't configured
manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout(), scheduledPool);
manager = new FileLockNodeManager(directory, replicatingBackup, configuration, scheduledPool);
} else {
throw new IllegalArgumentException("JDBC persistence allows only Shared Store HA options");
}
} else {
manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout(), scheduledPool);
manager = new FileLockNodeManager(directory, replicatingBackup, configuration, scheduledPool);
}
return manager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
Expand Down Expand Up @@ -71,29 +74,35 @@ public class FileLockNodeManager extends FileBasedNodeManager {

private final long lockAcquisitionTimeoutNanos;

private final long lockMonitorTimeoutNanos;

private final int lockMonitorMaxRetries;

protected boolean interrupted = false;

private final ScheduledExecutorService scheduledPool;

public FileLockNodeManager(final File directory, boolean replicatedBackup, ScheduledExecutorService scheduledPool) {
super(replicatedBackup, directory);
this.scheduledPool = scheduledPool;
this.lockAcquisitionTimeoutNanos = -1;
}

public FileLockNodeManager(final File directory, boolean replicatedBackup) {
super(replicatedBackup, directory);
this.scheduledPool = null;
this.lockAcquisitionTimeoutNanos = -1;
long timeout = ActiveMQDefaultConfiguration.getDefaultJournalLockAcquisitionTimeout();
this.lockAcquisitionTimeoutNanos = timeout == -1 ? -1 : TimeUnit.MILLISECONDS.toNanos(timeout);
this.lockMonitorTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(
ActiveMQDefaultConfiguration.getDefaultLockMonitorTimeout());
this.lockMonitorMaxRetries = ActiveMQDefaultConfiguration.getDefaultLockMonitorMaxRetries();
}

public FileLockNodeManager(final File directory,
boolean replicatedBackup,
long lockAcquisitionTimeout,
Configuration configuration,
ScheduledExecutorService scheduledPool) {
super(replicatedBackup, directory);
this.scheduledPool = scheduledPool;
this.lockAcquisitionTimeoutNanos = lockAcquisitionTimeout == -1 ? -1 : TimeUnit.MILLISECONDS.toNanos(lockAcquisitionTimeout);
this.lockAcquisitionTimeoutNanos = configuration.getJournalLockAcquisitionTimeout() == -1
? -1
: TimeUnit.MILLISECONDS.toNanos(configuration.getJournalLockAcquisitionTimeout());
this.lockMonitorTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(configuration.getLockMonitorTimeout());
this.lockMonitorMaxRetries = configuration.getLockMonitorMaxRetries();
}

@Override
Expand Down Expand Up @@ -477,7 +486,7 @@ protected FileLock lock(final int lockPosition) throws ClosedChannelException, A
private synchronized void startLockMonitoring() {
logger.debug("Starting the lock monitor");
if (monitorLock == null) {
monitorLock = new MonitorLock(scheduledPool, LOCK_MONITOR_TIMEOUT_NANOS, LOCK_MONITOR_TIMEOUT_NANOS, TimeUnit.NANOSECONDS, false);
monitorLock = new MonitorLock(scheduledPool, lockMonitorTimeoutNanos, lockMonitorTimeoutNanos, TimeUnit.NANOSECONDS, false, lockMonitorMaxRetries);
monitorLock.start();
} else {
logger.debug("Lock monitor was already started");
Expand All @@ -503,20 +512,28 @@ protected synchronized void notifyLostLock() {

// This has been introduced to help ByteMan test testLockMonitorInvalid on JDK 11: sun.nio.ch.FileLockImpl::isValid
// can affecting setPrimary, causing an infinite loop due to java.nio.channels.OverlappingFileLockException on tryLock
private boolean isPrimaryLockLost() {
// Made protected to allow testing with controlled lock failures.
// Subclasses overriding this method should maintain the contract: return true when the primary lock is lost,
// false when the lock is healthy. The return value directly controls whether MonitorLock triggers notifyLostLock().
protected boolean isPrimaryLockLost() {
final FileLock lock = this.primaryLock;
return (lock != null && !lock.isValid()) || lock == null;
}

private MonitorLock monitorLock;

public class MonitorLock extends ActiveMQScheduledComponent {
private final int maxRetries;
private final AtomicInteger consecutiveFailures = new AtomicInteger(0);

public MonitorLock(ScheduledExecutorService scheduledExecutorService,
long initialDelay,
long checkPeriod,
TimeUnit timeUnit,
boolean onDemand) {
boolean onDemand,
int maxRetries) {
super(scheduledExecutorService, initialDelay, checkPeriod, timeUnit, onDemand);
this.maxRetries = maxRetries;
}


Expand Down Expand Up @@ -556,8 +573,19 @@ public void run() {
}

if (lostLock) {
logger.warn("Lost the lock according to the monitor, notifying listeners");
notifyLostLock();
int failures = consecutiveFailures.incrementAndGet();
if (failures > maxRetries) {
logger.warn("Lost the lock according to the monitor after {} failed check(s), notifying listeners", failures);
notifyLostLock();
} else {
logger.warn("Lock check failed ({}/{}), will retry on next check period", failures, maxRetries + 1);
}
} else {
// Reset the failure counter on successful check
int previousFailures = consecutiveFailures.getAndSet(0);
if (previousFailures > 0) {
logger.info("Lock check succeeded after {} previous failure(s), resetting failure counter", previousFailures);
}
}
}

Expand Down
16 changes: 16 additions & 0 deletions artemis-server/src/main/resources/schema/artemis-configuration.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,22 @@
</xsd:annotation>
</xsd:element>

<xsd:element name="lock-monitor-timeout" type="xsd:long" default="2000" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
how long (in ms) between lock health checks
</xsd:documentation>
</xsd:annotation>
</xsd:element>

<xsd:element name="lock-monitor-max-retries" type="xsd:int" default="0" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
how many times to retry a failed lock health check before declaring the lock lost (useful for transient storage outages)
</xsd:documentation>
</xsd:annotation>
</xsd:element>

<xsd:element name="wild-card-routing-enabled" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ public void testDefaults() {
assertEquals(ActiveMQDefaultConfiguration.getDefaultLargeMessagesDir(), conf.getLargeMessagesDirectory());
assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage(), conf.getJournalCompactPercentage());
assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalLockAcquisitionTimeout(), conf.getJournalLockAcquisitionTimeout());
assertEquals(ActiveMQDefaultConfiguration.getDefaultLockMonitorTimeout(), conf.getLockMonitorTimeout());
assertEquals(ActiveMQDefaultConfiguration.getDefaultLockMonitorMaxRetries(), conf.getLockMonitorMaxRetries());
assertEquals(ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, conf.getJournalBufferTimeout_AIO());
assertEquals(ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, conf.getJournalBufferTimeout_NIO());
assertEquals(ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, conf.getJournalBufferSize_AIO());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ public void testDefaults() {

assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalLockAcquisitionTimeout(), conf.getJournalLockAcquisitionTimeout());

assertEquals(ActiveMQDefaultConfiguration.getDefaultLockMonitorTimeout(), conf.getLockMonitorTimeout());

assertEquals(ActiveMQDefaultConfiguration.getDefaultLockMonitorMaxRetries(), conf.getLockMonitorMaxRetries());

assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalMinFiles(), conf.getJournalMinFiles());

assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio(), conf.getJournalMaxIO_AIO());
Expand Down
Loading