Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 79 additions & 11 deletions cloud/src/meta-service/meta_service_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,55 @@ bool check_compaction_input_verions(const TabletCompactionJobPB& compaction,
return false;
}

// Normalize compaction type for conflict detection.
// EMPTY_CUMULATIVE only updates cumulative_point and cumulative_compaction_cnt without producing
// any output rowset. It MUST be considered the same family as CUMULATIVE so that an EMPTY_CUMULATIVE
// cannot be accepted while a real CUMULATIVE is still running on the same tablet (which would
// otherwise advance cumulative_point past the input range of the in-flight cumu and let base
// compaction race with it).
static inline TabletCompactionJobPB::CompactionType normalize_compaction_type_for_conflict(
TabletCompactionJobPB::CompactionType t) {
return t == TabletCompactionJobPB::EMPTY_CUMULATIVE ? TabletCompactionJobPB::CUMULATIVE : t;
}

// Two compaction jobs are considered to be in the same conflict family iff their normalized
// compaction types are equal, OR either side is FULL (full compaction conflicts with anything),
// OR they are different in (BASE, CUMULATIVE) - in this case the conflict still depends on whether
// their input version ranges overlap, callers should additionally consult version range checks.
static inline bool is_same_conflict_family(TabletCompactionJobPB::CompactionType a,
TabletCompactionJobPB::CompactionType b) {
return normalize_compaction_type_for_conflict(a) == normalize_compaction_type_for_conflict(b);
}

// Whether a compaction type belongs to the "rowset compaction family", i.e. any compaction
// kind that operates on rowsets bounded by `cumulative_point`. Only members of this family
// are subject to the cross-type version-range conflict check below. Note that STOP_TOKEN is
// already filtered out by the caller (see the early-return on STOP_TOKEN in start_compaction_job),
// so it is intentionally excluded here.
static inline bool is_rowset_compaction_family(TabletCompactionJobPB::CompactionType t) {
return t == TabletCompactionJobPB::BASE || t == TabletCompactionJobPB::CUMULATIVE ||
t == TabletCompactionJobPB::EMPTY_CUMULATIVE || t == TabletCompactionJobPB::FULL;
}

// Whether two compaction jobs MAY conflict on the rowset range (regardless of the actual
// input version range). The caller still needs to compare `input_versions` to make the
// final decision.
//
// Conflict matrix (Plan D extends this from same-type-only to cross-type within the family):
// FULL vs anything-in-family : true (full compaction touches the whole rowset range)
// BASE vs BASE / CUMULATIVE : true (their rowset ranges may overlap around cumu_point)
// CUMU vs CUMU / BASE : true (symmetric of the above)
// EMPTY_CU vs anything-in-family : true (it advances cumu_point and would race with the others)
//
// Any compaction type outside this family (or that we don't yet model) is conservatively NOT
// considered conflicting here - if a new type is added later, the author MUST revisit this
// function and decide its conflict semantics explicitly, instead of silently inheriting the
// "everything conflicts" behaviour.
static inline bool may_conflict_by_type(TabletCompactionJobPB::CompactionType a,
TabletCompactionJobPB::CompactionType b) {
return is_rowset_compaction_family(a) && is_rowset_compaction_family(b);
}

void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringstream& ss,
std::unique_ptr<Transaction>& txn, const StartTabletJobRequest* request,
StartTabletJobResponse* response, std::string& instance_id,
Expand Down Expand Up @@ -250,10 +299,16 @@ void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringst
compaction.input_versions().empty()) ||
(compaction.has_check_input_versions_range() &&
!compaction.check_input_versions_range())) {
// Unknown input version range, doesn't support parallel compaction of same type
// Unknown input version range, doesn't support parallel compaction of same family.
// EMPTY_CUMULATIVE is normalized to CUMULATIVE here so it conflicts with an in-flight
// CUMULATIVE on the same tablet (otherwise EMPTY_CUMULATIVE could advance
// cumulative_point past the in-flight cumu's input range and let base compaction race
// with it).
for (auto& c : compactions) {
if (c.type() != compaction.type() && c.type() != TabletCompactionJobPB::FULL)
if (!is_same_conflict_family(c.type(), compaction.type()) &&
c.type() != TabletCompactionJobPB::FULL) {
continue;
}
if (c.id() == compaction.id()) return; // Same job, return OK to keep idempotency
msg = fmt::format("compaction has already started, tablet_id={} job={}", tablet_id,
proto_to_json(c));
Expand All @@ -270,23 +325,36 @@ void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringst
a.input_versions(1) < b.input_versions(0);
};
for (auto& c : compactions) {
if (c.type() != compaction.type() && c.type() != TabletCompactionJobPB::FULL)
continue;
// Plan D: BASE and CUMULATIVE on the same tablet may also conflict when their
// input version ranges overlap. Previously we only checked same-type conflicts,
// which left a window where BASE could be accepted with versions that overlap an
// in-flight CUMULATIVE (and vice versa) after cumulative_point was unsafely
// advanced. Now we treat any pair within (BASE, CUMULATIVE, EMPTY_CUMULATIVE,
// FULL) as potentially conflicting and rely on the input-version-range check
// below to make the final decision.
if (!may_conflict_by_type(c.type(), compaction.type())) continue;
if (c.input_versions_size() > 0 && version_not_conflict(c, compaction)) continue;
if (c.id() == compaction.id()) return; // Same job, return OK to keep idempotency
msg = fmt::format("compaction has already started, tablet_id={} job={}", tablet_id,
proto_to_json(c));
code = MetaServiceCode::JOB_TABLET_BUSY;
// Unknown version range of started compaction, BE should not retry other version range
if (c.input_versions_size() == 0) return;
// Notify version ranges in started compaction to BE, so BE can retry other version range
// Notify version ranges of all in-flight compactions that may conflict with the
// incoming one, so BE can retry on a non-overlapping range. The notification
// predicate is intentionally kept consistent with the conflict predicate above
// (`may_conflict_by_type`); previously only same-family ranges were surfaced,
// which left BE blind to cross-family (BASE vs CUMULATIVE) conflicts.
//
// An in-flight EMPTY_CUMULATIVE (or any other family member without a concrete
// [v_lo, v_hi]) carries no usable range; surfacing fabricated zeros would
// mislead BE retry. Skip such entries defensively here - the real conflict is
// already enforced by the version-range check above.
for (auto& c : compactions) {
if (c.type() == compaction.type() || c.type() == TabletCompactionJobPB::FULL) {
// If there are multiple started compaction of same type, they all must has input version range
DCHECK_EQ(c.input_versions_size(), 2) << proto_to_json(c);
response->add_version_in_compaction(c.input_versions(0));
response->add_version_in_compaction(c.input_versions(1));
}
if (!may_conflict_by_type(c.type(), compaction.type())) continue;
if (c.input_versions_size() != 2) continue;
response->add_version_in_compaction(c.input_versions(0));
response->add_version_in_compaction(c.input_versions(1));
}
return;
}
Expand Down
Loading
Loading