Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,18 @@ public static Set<String> splitCommaDelimited(String src)

public volatile DurationSpec.LongMillisecondsBound stream_transfer_task_timeout = new DurationSpec.LongMillisecondsBound("12h");

/**
* Timeout for the per-message window when a non-CMS node sends a TCM_COMMIT_REQ to a CMS node.
* The CMS will attempt Paxos retries for (cms_await_timeout - write_request_timeout) before
* returning an explicit failure, giving the sender time to reschedule before its message callback fires.
*
* WARNING: cms_await_timeout should be substantially larger than write_request_timeout.
* A single Paxos CAS attempt can take up to (cas_contention_timeout + write_request_timeout) to
* complete. If cms_await_timeout is set close to write_request_timeout the deadline reduction has
* no effect and many concurrent CMS operations timing out at the same time may create a thundering herd,
* all retrying against the CMS.
* Default 2 minutes to match "nodetool cms initialize".
*/
public volatile DurationSpec.LongMillisecondsBound cms_await_timeout = new DurationSpec.LongMillisecondsBound("120000ms");
public volatile int cms_default_max_retries = 10;
@Deprecated(since="6.0")
Expand All @@ -192,6 +204,39 @@ public static Set<String> splitCommaDelimited(String src)
public String cms_retry_delay = "50ms*attempts <= 500ms ... 100ms*attempts <= 1s,retries=10";

public volatile CMSCommitMemberPreferencePolicy cms_commit_member_preference_policy = CMSCommitMemberPreferencePolicy.random;
/**
* Controls the sender-side retry behavior for CMS commits (topology changes,
* CMS reconfiguration, node registration — everything except STARTUP and schema DDL
* with an explicit client deadline).
*
* cms_commit_timeout: Overall deadline for the commit to succeed. The sender retries
* with exponential backoff until this deadline expires. Each retry sends a fresh
* TCM_COMMIT_REQ to a (possibly different) CMS node. Set this longer than the
* expected total time for all concurrent operations to drain through the Paxos log.
*
* cms_commit_retry_initial_delay: Base delay for Full Jitter exponential backoff.
* Actual delay per retry = uniform_random(0, min(max_delay, initial_delay * 2^attempt)).
* Higher values reduce Paxos contention at the cost of slower progress when the log
* is lightly loaded. 5s is a good default — it spaces retries enough to avoid
* thundering herd while still making progress within minutes.
*
* cms_commit_retry_max_delay: Cap on the exponential backoff. Once 2^attempt growth
* exceeds this, all subsequent retries draw from uniform_random(0, max_delay).
* 60s keeps retries frequent enough that a freed Paxos slot is filled within
* ~30s on average, while avoiding retry storms.
*
* When to change:
* - If bulk topology ops complete but take too long: reduce max_delay (e.g. 30s)
* to retry more aggressively. Monitor CommitRetries rate for contention impact.
* - If bulk topology ops fail (timeout): increase cms_commit_timeout.
* - If Paxos contention is extremely high (>100 concurrent commits): increase
* initial_delay to 10-15s to spread the retry wavefront.
* - All three are hot-settable via JMX without restart.
*/
public volatile DurationSpec.LongMillisecondsBound cms_commit_timeout = new DurationSpec.LongMillisecondsBound("1h");
public volatile DurationSpec.LongMillisecondsBound cms_commit_retry_initial_delay = new DurationSpec.LongMillisecondsBound("5s");
public volatile DurationSpec.LongMillisecondsBound cms_commit_retry_max_delay = new DurationSpec.LongMillisecondsBound("60s");

public volatile int epoch_aware_debounce_inflight_tracker_max_size = 100;

/**
Expand Down
92 changes: 92 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.CacheService.CacheType;
import org.apache.cassandra.service.FileSystemOwnershipCheck;
import org.apache.cassandra.service.RetryStrategy;
import org.apache.cassandra.service.StartupChecks;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.TimeoutStrategy;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.api.AccordWaitStrategies;
import org.apache.cassandra.service.consensus.TransactionalMode;
Expand Down Expand Up @@ -268,6 +270,15 @@ public class DatabaseDescriptor

public static volatile boolean allowUnlimitedConcurrentValidations = ALLOW_UNLIMITED_CONCURRENT_VALIDATIONS.getBoolean();

/**
* RetryStrategy which provides exponential backoff with full jitter, for use by both CMS and non-CMS members
* when submitting a Commit request. The range and increments of the backoff times are defined by
* conf.cms_commit_retry_initial_delay and conf.cms_commit_retry_max_delay. Both are hot properties and so
* changing either one causes this retry strategy to be reconstructed.
*/
private static volatile RetryStrategy cms_commit_retry_strategy;


/**
* The configuration for guardrails.
*/
Expand Down Expand Up @@ -578,6 +589,8 @@ private static void applyAll() throws ConfigurationException

applyAccord();

applyCMS();

applyStartupChecks();
}

Expand Down Expand Up @@ -1384,6 +1397,25 @@ private static void applyAccord()
AccordService.applyProtocolModifiers(getAccord());
}

private static void applyCMS()
{
try
{
long initialDelayMs = conf.cms_commit_retry_initial_delay.to(TimeUnit.MILLISECONDS);
long maxDelayMs = conf.cms_commit_retry_max_delay.to(TimeUnit.MILLISECONDS);
// range of backoff wait time starts at 0ms backing off exponentially at initialDelayMs * 2^attempts
String spec = String.format("0ms ... %dms * 2^attempts <= %dms", initialDelayMs, maxDelayMs);
logger.debug("Initializing cms_commit_retry_strategy from spec: " + spec);
cms_commit_retry_strategy = RetryStrategy.parse(spec,
TimeoutStrategy.LatencySourceFactory.none(),
RetryStrategy.randomizers.uniform());
}
catch (Exception e)
{
throw new ConfigurationException("Invalid configuration for cms_commit_retry_strategy. " + e.getMessage(), e);
}
}

public static StartupChecksConfiguration getStartupChecksConfiguration()
{
return startupChecksConfiguration;
Expand Down Expand Up @@ -6098,6 +6130,15 @@ public static Config.CMSCommitMemberPreferencePolicy getCmsCommitMemberPreferenc
return conf.cms_commit_member_preference_policy;
}

public static void setCmsAwaitTimeout(long timeoutInMillis)
{
if (timeoutInMillis != conf.cms_await_timeout.to(TimeUnit.MILLISECONDS))
{
logger.info("Setting cms_await_timeout to {}ms", timeoutInMillis);
conf.cms_await_timeout = new DurationSpec.LongMillisecondsBound(timeoutInMillis);
}
}

public static void setCmsCommitMemberPreferencePolicy(Config.CMSCommitMemberPreferencePolicy policy)
{
conf.cms_commit_member_preference_policy = policy;
Expand All @@ -6108,6 +6149,57 @@ public static void setCmsCommitMemberPreferencePolicy(String policy)
setCmsCommitMemberPreferencePolicy(Config.CMSCommitMemberPreferencePolicy.valueOf(toLowerCaseLocalized(policy)));
}

public static DurationSpec getCmsCommitTimeout()
{
return conf.cms_commit_timeout;
}

public static void setCmsCommitTimeout(long timeoutInMillis)
{
if (timeoutInMillis != conf.cms_commit_timeout.to(TimeUnit.MILLISECONDS))
{
logger.info("Setting cms_commit_timeout to {}ms", timeoutInMillis);
conf.cms_commit_timeout = new DurationSpec.LongMillisecondsBound(timeoutInMillis);
}
}



public static DurationSpec getCmsCommitRetryInitialDelay()
{
return conf.cms_commit_retry_initial_delay;
}

public static void setCmsCommitRetryInitialDelay(long delayInMillis)
{
if (delayInMillis != conf.cms_commit_retry_initial_delay.to(TimeUnit.MILLISECONDS))
{
logger.info("Setting cms_commit_retry_initial_delay to {}ms", delayInMillis);
conf.cms_commit_retry_initial_delay = new DurationSpec.LongMillisecondsBound(delayInMillis);
applyCMS();
}
}

public static DurationSpec getCmsCommitRetryMaxDelay()
{
return conf.cms_commit_retry_max_delay;
}

public static void setCmsCommitRetryMaxDelay(long delayInMillis)
{
if (delayInMillis != conf.cms_commit_retry_max_delay.to(TimeUnit.MILLISECONDS))
{
logger.info("Setting cms_commit_retry_max_delay to {}ms", delayInMillis);
conf.cms_commit_retry_max_delay = new DurationSpec.LongMillisecondsBound(delayInMillis);
applyCMS();
}
}

public static RetryStrategy getCmsCommitRetryStrategy()
{
return cms_commit_retry_strategy;
}

public static int getEpochAwareDebounceInFlightTrackerMaxSize()
{
return conf.epoch_aware_debounce_inflight_tracker_max_size;
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/net/Verb.java
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ public enum Verb

// transactional cluster metadata
TCM_COMMIT_RSP (801, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::commitResultSerializer, RESPONSE_HANDLER ),
// message timeout is overridden in RemoteProcessor to cmsAwaitTimeout for non-client facing commit requests
TCM_COMMIT_REQ (802, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::commitSerializer, () -> commitRequestHandler(), TCM_COMMIT_RSP ),
TCM_FETCH_CMS_LOG_RSP (803, P0, shortTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, RESPONSE_HANDLER ),
TCM_FETCH_CMS_LOG_REQ (804, P0, rpcTimeout, FETCH_METADATA, () -> FetchCMSLog.serializer, () -> fetchLogRequestHandler(), TCM_FETCH_CMS_LOG_RSP ),
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/service/RetryStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ public long computeWait(int attempt, TimeUnit units)
if (min > maxMinMicros)
min = maxMinMicros;
long max = this.max.getMicros(attempt);
if (max > maxMaxMicros)
max = maxMaxMicros;
result = min >= max ? min : waitRandomizer.wait(min, max, attempt);
}

Expand Down
36 changes: 30 additions & 6 deletions src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.metrics.TCMMetrics;
import org.apache.cassandra.tcm.log.Entry;
import org.apache.cassandra.tcm.log.LocalLog;
import org.apache.cassandra.tcm.log.LogState;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.exceptions.ExceptionCode.INVALID;
import static org.apache.cassandra.exceptions.ExceptionCode.SERVER_ERROR;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;

public abstract class AbstractLocalProcessor implements Processor
{
Expand All @@ -50,6 +53,9 @@ public AbstractLocalProcessor(LocalLog log)
@Override
public final Commit.Result commit(Entry.Id entryId, Transformation transform, final Epoch lastKnown, Retry retryPolicy)
{
String transformStr = transform.toString(); // convert once as idempotent and used in multiple logs
logger.debug("Starting local commit of {} with policy {}", transformStr, retryPolicy);
long commitStart = nanoTime();
while (!retryPolicy.hasExpired())
{
ClusterMetadata previous = log.waitForHighestConsecutive();
Expand All @@ -66,7 +72,7 @@ public final Commit.Result commit(Entry.Id entryId, Transformation transform, fi
Transformation.Result result;
if (!transform.eligibleToCommit(previous))
{
result = new Transformation.Rejected(INVALID, "Transformation rejected, can't commit " + transform +
result = new Transformation.Rejected(INVALID, "Transformation rejected, can't commit " + transformStr +
" it not supported with cluster common serialization version " + previous.directory.commonSerializationVersion +
" and min/max serialization versions " + previous.directory.clusterMinVersion + "/" + previous.directory.clusterMaxVersion);
}
Expand All @@ -79,7 +85,11 @@ public final Commit.Result commit(Entry.Id entryId, Transformation transform, fi
// Just try to catch up to the latest distributed state.
if (result.isRejected())
{
ClusterMetadata replayed = fetchLogAndWait(null, retryPolicy);
// Use a dedicated retry policy here as the one for the commit itself may not be appropriate.
// It uses the wrong metric and for STARTUP transformations will retry indefinitely, which is not
// what we want here.
Retry fetchLogRetry = Retry.until(retryPolicy.deadlineNanos, TCMMetrics.instance.fetchLogRetries);
ClusterMetadata replayed = fetchLogAndWait(null, fetchLogRetry);

// Retry if replay has changed the epoch, return rejection otherwise.
if (!replayed.epoch.isAfter(previous.epoch))
Expand All @@ -96,12 +106,18 @@ public final Commit.Result commit(Entry.Id entryId, Transformation transform, fi
{
Epoch nextEpoch = result.success().metadata.epoch;
// If metadata applies, try committing it to the log
long casStart = nanoTime();
boolean applied = tryCommitOne(entryId, transform, previous.epoch, nextEpoch);
long casElapsedUs = NANOSECONDS.toMicros(nanoTime() - casStart);
logger.debug("tryCommitOne for {} epoch {}->{}: applied={}, took {}us",
transform.kind(), previous.epoch, nextEpoch, applied, casElapsedUs);

// Application here semantially means "succeeded in committing to the distributed log".
if (applied)
{
logger.info("Committed {}. New epoch is {}", transform, nextEpoch);
logger.info("Committed {}. New epoch is {}. Took {} attempts in {}us total.",
transformStr, nextEpoch, retryPolicy.attempts(),
NANOSECONDS.toMicros(nanoTime() - commitStart));
log.append(new Entry(entryId, nextEpoch, new Transformation.Executed(transform, result)));
log.awaitAtLeast(nextEpoch);

Expand All @@ -113,7 +129,11 @@ public final Commit.Result commit(Entry.Id entryId, Transformation transform, fi
if (!retryPolicy.maybeSleep())
break;
// TODO: could also add epoch from mis-application from [applied].
fetchLogAndWait(null, retryPolicy);
// Use a dedicated retry policy here as the one for the commit itself may not be appropriate.
// It uses the wrong metric and for STARTUP transformations will retry indefinitely, which is not
// what we want here.
Retry fetchLogRetry = Retry.until(retryPolicy.deadlineNanos, TCMMetrics.instance.fetchLogRetries);
fetchLogAndWait(null, fetchLogRetry);
}
}
catch (Throwable e)
Expand All @@ -124,8 +144,12 @@ public final Commit.Result commit(Entry.Id entryId, Transformation transform, fi
break;
}
}
return Commit.Result.failed(SERVER_ERROR,
String.format("Could not perform commit; policy %s gave up", retryPolicy));
long remainingMillis = NANOSECONDS.toMillis(retryPolicy.remainingNanos());
String failureMsg = String.format("Could not perform commit after %d attempts. Time remaining: %dms",
retryPolicy.attempts(), remainingMillis);
logger.debug("Commit {} failed in {}us total. {}",
transformStr, NANOSECONDS.toMicros(nanoTime() - commitStart), failureMsg);
return Commit.Result.failed(SERVER_ERROR, failureMsg);
}

public Commit.Result maybeFailure(Entry.Id entryId, Epoch lastKnown, Supplier<Commit.Result.Failure> orElse)
Expand Down
62 changes: 62 additions & 0 deletions src/java/org/apache/cassandra/tcm/CMSOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Map;
import java.util.stream.Collectors;

import com.google.common.base.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,6 +51,7 @@
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MBeanWrapper;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration;

public class CMSOperations implements CMSOperationsMBean
Expand Down Expand Up @@ -80,6 +83,65 @@ private CMSOperations(ClusterMetadataService cms)
this.cms = cms;
}

// TCM CMS await timeout
public long getCmsAwaitTimeoutMillis()
{
return DatabaseDescriptor.getCmsAwaitTimeout().to(MILLISECONDS);
}

public void setCmsAwaitTimeoutMillis(long timeoutInMillis)
{
Preconditions.checkState(timeoutInMillis > 0);
DatabaseDescriptor.setCmsAwaitTimeout(timeoutInMillis);
}

// CMS commit timeout with exponential backoff
public long getCmsCommitTimeoutMillis()
{
return DatabaseDescriptor.getCmsCommitTimeout().to(MILLISECONDS);
}

public void setCmsCommitTimeoutMillis(long timeoutInMillis)
{
Preconditions.checkState(timeoutInMillis > 0);
DatabaseDescriptor.setCmsCommitTimeout(timeoutInMillis);
}

public long getCmsCommitRetryInitialDelayMillis()
{
return DatabaseDescriptor.getCmsCommitRetryInitialDelay().to(MILLISECONDS);
}

public void setCmsCommitRetryInitialDelayMillis(long delayInMillis)
{
Preconditions.checkState(delayInMillis > 0);
DatabaseDescriptor.setCmsCommitRetryInitialDelay(delayInMillis);
}

public long getCmsCommitRetryMaxDelayMillis()
{
return DatabaseDescriptor.getCmsCommitRetryMaxDelay().to(MILLISECONDS);
}

public void setCmsCommitRetryMaxDelayMillis(long delayInMillis)
{
Preconditions.checkState(delayInMillis > 0);
DatabaseDescriptor.setCmsCommitRetryMaxDelay(delayInMillis);
}

@Override
public String getCmsCommitMemberPreferencePolicy()
{
return DatabaseDescriptor.getCmsCommitMemberPreferencePolicy().name();
}

@Override
public void setCmsCommitMemberPreferencePolicy(String policy)
{
DatabaseDescriptor.setCmsCommitMemberPreferencePolicy(policy);
logger.info("Set cms_commit_member_preference_policy to {}", policy);
}

@Override
public void initializeCMS(List<String> ignoredEndpoints)
{
Expand Down
Loading