From 3953d52b5477d3ea24ccd85aef3b4d2bcf0555f2 Mon Sep 17 00:00:00 2001 From: Lijia Liu Date: Mon, 22 Jun 2026 20:40:01 +0800 Subject: [PATCH] [fix](cloud-compaction) prevent EMPTY_CUMULATIVE / BASE-CUMU races on the same tablet (#64619) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug --- BE config enable_parallel_cumu_compaction to true. --- On the meta-service, start_compaction_job only rejected a new job when its type strictly equalled an in-flight job's type. This left two races: 1. EMPTY_CUMULATIVE was treated as a different type from CUMULATIVE. While a real CUMULATIVE [v_lo, v_hi] was still running, an EMPTY_CUMULATIVE could be accepted and committed, advancing cumulative_point past v_hi. A subsequent BASE compaction could then pull rowsets in [v_lo, v_hi] as input and race with the in-flight CUMULATIVE on the same rowsets. 2. With check_input_versions_range=true, BASE and CUMULATIVE were never cross-checked against each other, so overlapping input ranges across the two types could be accepted concurrently. Fix --- * Normalize EMPTY_CUMULATIVE to CUMULATIVE for conflict detection so they belong to the same conflict family. * Extend the version-range conflict check to the whole rowset compaction family (BASE / CUMULATIVE / EMPTY_CUMULATIVE / FULL) instead of same-type only. Non-overlapping ranges across types are still allowed. * Keep version_in_compaction notification scoped to the same family so BE retry semantics are unchanged. Behaviour matrix (new -> active, OK = accept, BUSY = JOB_TABLET_BUSY) --------------------------------------------------------------------- before after EMPTY_CUMU vs CUMU OK (race) BUSY CUMU vs EMPTY_CUMU OK BUSY BASE vs CUMU overlap OK (race) BUSY CUMU vs BASE overlap OK (race) BUSY BASE vs CUMU disjoint OK OK (unchanged) same-type / FULL / STOP_TOKEN / idempotent same-id : unchanged Tests ----- * EmptyCumulativeBlockedByCumulativeTest * BaseCumulativeCrossTypeConflictTest The cluster log: ``` 1. start cc -------------- start cc(42326-42474) ------------ RuntimeLogger I20260616 06:05:58.687474 1715 cloud_cumulative_compaction.cpp:111] start CloudCumulativeCompaction, tablet_id=1763693245218, range=[42326-42474]|job_id=7c02be46-86a3-43b7-9687-9c93b1f3affe|input_rowsets=5|input_rows=427170|input_segments=5|input_rowsets_data_size=52916937|input_rowsets_index_size=0|input_rowsets_total_size=52916937|tablet_max_version=42475|cumulative_point=42326|num_rowsets=27|cumu_num_rowsets=6 -------------- meta service record this cc -------------- I20260616 06:05:58.687247 3747094 meta_service_helper.h:174] begin start_tablet_job remote_caller=10.2.18.57:52036 original_client_ip=10.2.18.57:9050 request=cloud_unique_id: "1:1753070360:jYdIZgSo" job { idx { table_id: 1753072815281 index_id: 1753072815282 partition_id: 1763693245203 tablet_id: 1763693245218 } compaction { initiator: "10.2.18.57:9050" type: CUMULATIVE input_versions: 42326 input_versions: 42474 base_compaction_cnt: 91 cumulative_compaction_cnt: 3590 id: "7c02be46-86a3-43b7-9687-9c93b1f3affe" expiration: 1781647558 lease: 1781561238 check_input_versions_range: true } } request_ip: "10.2.18.57:9050" meta_service.10.2.16.38.INFO:I20260616 06:05:58.688797 3747093 meta_service_job.cpp:272] (1753070360)compaction job to save job={"initiator":"10.2.18.57:9050","type":"CUMULATIVE","input_versions":["42326","42474"],"base_compaction_cnt":"91","cumulative_compaction_cnt":"3590","id":"7c02be46-86a3-43b7-9687-9c93b1f3affe","expiration":"1781647558","lease":"1781561238","check_input_versions_range":true} 2. DELETE trigger the increase of comulative point. --------- meta service record the EMPTY_CUMULATIVE job. The comulative point has become to 42476---------- I20260616 06:05:58.792258 1621135 meta_service_helper.h:174] begin start_tablet_job remote_caller=10.2.18.57:33586 original_client_ip=10.2.18.57:9050 request=cloud_unique_id: "1:1753070360:jYdIZgSo" job { idx { table_id: 1753072815281 index_id: 1753072815282 partition_id: 1763693245203 tablet_id: 1763693245218 } compaction { initiator: "10.2.18.57:9050" type: EMPTY_CUMULATIVE base_compaction_cnt: 91 cumulative_compaction_cnt: 3590 id: "92277d99-b14f-42df-bfd8-c26e75ff8052" lease: 1781561178 } } request_ip: "10.2.18.57:9050" I20260616 06:05:58.793903 1621138 meta_service_job.cpp:272] (1753070360)compaction job to save job={"initiator":"10.2.18.57:9050","type":"EMPTY_CUMULATIVE","base_compaction_cnt":"91","cumulative_compaction_cnt":"3590","id":"92277d99-b14f-42df-bfd8-c26e75ff8052","lease":"1781561178"} I20260616 06:05:58.796713 1621137 meta_service_helper.h:174] begin finish_tablet_job remote_caller=10.2.18.57:60878 original_client_ip=10.2.18.57:9050 request=cloud_unique_id: "1:1753070360:jYdIZgSo" action: COMMIT job { idx { table_id: 1753072815281 index_id: 1753072815282 partition_id: 1763693245203 tablet_id: 1763693245218 } compaction { initiator: "10.2.18.57:9050" type: EMPTY_CUMULATIVE input_cumulative_point: 42326 output_cumulative_point: 42476 base_compaction_cnt: 91 cumulative_compaction_cnt: 3590 id: "92277d99-b14f-42df-bfd8-c26e75ff8052" lease: 1781561178 } } request_ip: "10.2.18.57:9050" --------- BE log the EMPTY_CUMULATIVE -------------- RuntimeLogger I20260616 06:05:58.801268 1715 cloud_cumulative_compaction.cpp:533] do empty cumulative compaction to update cumulative point|job_id=92277d99-b14f-42df-bfd8-c26e75ff8052|tablet_id=1763693245218|input_cumulative_point=42326|output_cumulative_point=42476 RuntimeLogger I20260616 06:05:58.801329 1715 cloud_cumulative_compaction.cpp:539] tablet stats=idx { table_id: 1753072815281 index_id: 1753072815282 partition_id: 1763693245203 tablet_id: 1763693245218 } data_size: 25302189990 num_rows: 303319387 num_rowsets: 27 num_segments: 43 base_compaction_cnt: 91 cumulative_compaction_cnt: 3591 cumulative_point: 42476 last_base_compaction_time_ms: 1781539519000 last_cumu_compaction_time_ms: 1781561158000 index_size: 0 segment_size: 25302189990 RuntimeLogger W20260616 06:05:58.801383 1715 cloud_storage_engine.cpp:529] failed to submit compaction task for tablet: 1763693245218, err: [E-2010]cumulative compaction meet delete version 3. start bc -------------- be record the base compaction (2 ~ 42431) RuntimeLogger I20260616 06:06:01.042435 1715 cloud_base_compaction.cpp:84] start CloudBaseCompaction, tablet_id=1763693245218, range=[2-42431]|job_id=a8e92687-2211-4d74-82aa-d6e99c3fc360|input_rowsets=21|input_rows=303315005|input_segments=39|input_rowsets_data_size=25294029764|input_rowsets_index_size=0|input_rowsets_total_size=25294029764 -------------- meta service record the bc -------------- I20260616 06:06:01.043434 3747099 meta_service_helper.h:174] begin start_tablet_job remote_caller=10.2.18.57:52174 original_client_ip=10.2.18.57:9050 request=cloud_unique_id: "1:1753070360:jYdIZgSo" job { idx { table_id: 1753072815281 index_id: 1753072815282 partition_id: 1763693245203 tablet_id: 1763693245218 } compaction { initiator: "10.2.18.57:9050" type: BASE input_versions: 2 input_versions: 42431 base_compaction_cnt: 91 cumulative_compaction_cnt: 3591 id: "a8e92687-2211-4d74-82aa-d6e99c3fc360" expiration: 1781647561 lease: 1781561241 } } request_ip: "10.2.18.57:9050" RuntimeLogger I20260616 06:06:13.279381 1708 cloud_cumulative_compaction.cpp:208] finish CloudCumulativeCompaction, tablet_id=1763693245218, cost=14587ms, range=[42326-42474]|job_id=7c02be46-86a3-43b7-9687-9c93b1f3affe|input_rowsets=5|input_rows=427170|input_segments=5|input_rowsets_data_size=52916937|input_rowsets_index_size=0|input_rowsets_total_size=52916937|output_rows=427170|output_segments=1|output_rowset_data_size=45147495|output_rowset_index_size=0|output_rowset_total_size=45147495|tablet_max_version=42475|cumulative_point=42476|num_rowsets=23|cumu_num_rowsets=0|local_read_time_us=1407|remote_read_time_us=0|local_read_bytes=5717690|remote_read_bytes=0 4. cc complete and -------------- meta service record the cc and drop rowsets whitch version between 42326 and 42474 RuntimeLogger I20260616 06:06:13.279381 1708 cloud_cumulative_compaction.cpp:208] finish CloudCumulativeCompaction, tablet_id=1763693245218, cost=14587ms, range=[42326-42474]|job_id=7c02be46-86a3-43b7-9687-9c93b1f3affe|input_rowsets=5|input_rows=427170|input_segments=5|input_rowsets_data_size=52916937|input_rowsets_index_size=0|input_rowsets_total_size=52916937|output_rows=427170|output_segments=1|output_rowset_data_size=45147495|output_rowset_index_size=0|output_rowset_total_size=45147495|tablet_max_version=42475|cumulative_point=42476|num_rowsets=23|cumu_num_rowsets=0|local_read_time_us=1407|remote_read_time_us=0|local_read_bytes=5717690|remote_read_bytes=0 I20260616 06:06:13.269352 3747098 meta_service_helper.h:174] begin finish_tablet_job remote_caller=10.2.18.57:52070 original_client_ip=10.2.18.57:9050 request=cloud_unique_id: "1:1753070360:jYdIZgSo" action: COMMIT job { idx { table_id: 1753072815281 index_id: 1753072815282 partition_id: 1763693245203 tablet_id: 1763693245218 } compaction { initiator: "10.2.18.57:9050" type: CUMULATIVE input_cumulative_point: 42476 output_cumulative_point: 42475 num_input_rowsets: 5 num_input_segments: 5 num_output_rowsets: 1 num_output_segments: 1 size_input_rowsets: 52916937 size_output_rowsets: 45147495 num_input_rows: 427170 num_output_rows: 427170 input_versions: 42326 input_versions: 42474 output_versions: 42474 output_rowset_ids: "020000000008d8252c4a4e54ca9c6c96f347dc1417ad5db8" txn_id: 8054901679135295045 id: "7c02be46-86a3-43b7-9687-9c93b1f3affe" index_size_input_rowsets: 0 segment_size_input_rowsets: 52916937 index_size_output_rowsets: 0 segment_size_output_rowsets: 45147495 } } request_ip: "10.2.18.57:9050" 5. bc complete -------------- bc complete and generate new rs (2-42431)-------------- RuntimeLogger I20260616 06:54:02.973222 1699 cloud_base_compaction.cpp:293] finish CloudBaseCompaction, tablet_id=1763693245218, cost=2881925ms range=[2-42431]|job_id=a8e92687-2211-4d74-82aa-d6e99c3fc360|input_rowsets=21|input_rows=303315005|input_segments=39|input_rowsets_data_size=25294029764|input_rowsets_index_size=0|input_rowsets_total=25294029764|output_rows=303315005|output_segments=24|output_rowset_data_size=25176822135|output_rowset_index_size=0|output_rowset_total_size=25176822135|local_read_time_us=1319383|remote_read_time_us=0|local_read_bytes=1154464868|remote_read_bytes=0 ``` The version (2-42431) generated by bc conficts with the version (42326-42474) gegerated by cc. The compaction info: ```json { "rowsets": [ "[0-1] 0 DATA NONOVERLAPPING 0200000000000000ffffffffffea4868ffffffffffffffe8 0", "[2-42431] 24 DATA NONOVERLAPPING 020000000008d8262c4a4e54ca9c6c96f347dc1417ad5db8 23.45 GB", "[42326-42474] 1 DATA NONOVERLAPPING 020000000008d8252c4a4e54ca9c6c96f347dc1417ad5db8 43.06 MB", "[42475-42475] 0 DELETE OVERLAP_UNKNOWN 020000000008d8242c4a4e54ca9c6c96f347dc1417ad5db8 0" ], "missing_rowsets": [ "[42432-42325]" ] } ``` ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test - [ ] Regression test - [x] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason - Behavior changed: - [ ] No. - [ ] Yes. - Does this need documentation? - [ ] No. - [ ] Yes. ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label --------- Co-authored-by: liutang123 --- cloud/src/meta-service/meta_service_job.cpp | 90 +++++++- cloud/test/meta_service_job_test.cpp | 215 ++++++++++++++++++++ 2 files changed, 294 insertions(+), 11 deletions(-) diff --git a/cloud/src/meta-service/meta_service_job.cpp b/cloud/src/meta-service/meta_service_job.cpp index dad84f6a36ab0d..35aedd81852c28 100644 --- a/cloud/src/meta-service/meta_service_job.cpp +++ b/cloud/src/meta-service/meta_service_job.cpp @@ -97,6 +97,55 @@ bool check_compaction_input_verions(const TabletCompactionJobPB& compaction, return false; } +// Normalize compaction type for conflict detection. +// EMPTY_CUMULATIVE only updates cumulative_point and cumulative_compaction_cnt without producing +// any output rowset. It MUST be considered the same family as CUMULATIVE so that an EMPTY_CUMULATIVE +// cannot be accepted while a real CUMULATIVE is still running on the same tablet (which would +// otherwise advance cumulative_point past the input range of the in-flight cumu and let base +// compaction race with it). +static inline TabletCompactionJobPB::CompactionType normalize_compaction_type_for_conflict( + TabletCompactionJobPB::CompactionType t) { + return t == TabletCompactionJobPB::EMPTY_CUMULATIVE ? TabletCompactionJobPB::CUMULATIVE : t; +} + +// Two compaction jobs are considered to be in the same conflict family iff their normalized +// compaction types are equal, OR either side is FULL (full compaction conflicts with anything), +// OR they are different in (BASE, CUMULATIVE) - in this case the conflict still depends on whether +// their input version ranges overlap, callers should additionally consult version range checks. +static inline bool is_same_conflict_family(TabletCompactionJobPB::CompactionType a, + TabletCompactionJobPB::CompactionType b) { + return normalize_compaction_type_for_conflict(a) == normalize_compaction_type_for_conflict(b); +} + +// Whether a compaction type belongs to the "rowset compaction family", i.e. any compaction +// kind that operates on rowsets bounded by `cumulative_point`. Only members of this family +// are subject to the cross-type version-range conflict check below. Note that STOP_TOKEN is +// already filtered out by the caller (see the early-return on STOP_TOKEN in start_compaction_job), +// so it is intentionally excluded here. +static inline bool is_rowset_compaction_family(TabletCompactionJobPB::CompactionType t) { + return t == TabletCompactionJobPB::BASE || t == TabletCompactionJobPB::CUMULATIVE || + t == TabletCompactionJobPB::EMPTY_CUMULATIVE || t == TabletCompactionJobPB::FULL; +} + +// Whether two compaction jobs MAY conflict on the rowset range (regardless of the actual +// input version range). The caller still needs to compare `input_versions` to make the +// final decision. +// +// Conflict matrix (Plan D extends this from same-type-only to cross-type within the family): +// FULL vs anything-in-family : true (full compaction touches the whole rowset range) +// BASE vs BASE / CUMULATIVE : true (their rowset ranges may overlap around cumu_point) +// CUMU vs CUMU / BASE : true (symmetric of the above) +// EMPTY_CU vs anything-in-family : true (it advances cumu_point and would race with the others) +// +// Any compaction type outside this family (or that we don't yet model) is conservatively NOT +// considered conflicting here - if a new type is added later, the author MUST revisit this +// function and decide its conflict semantics explicitly, instead of silently inheriting the +// "everything conflicts" behaviour. +static inline bool may_conflict_by_type(TabletCompactionJobPB::CompactionType a, + TabletCompactionJobPB::CompactionType b) { + return is_rowset_compaction_family(a) && is_rowset_compaction_family(b); +} + void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringstream& ss, std::unique_ptr& txn, const StartTabletJobRequest* request, StartTabletJobResponse* response, std::string& instance_id, @@ -250,10 +299,16 @@ void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringst compaction.input_versions().empty()) || (compaction.has_check_input_versions_range() && !compaction.check_input_versions_range())) { - // Unknown input version range, doesn't support parallel compaction of same type + // Unknown input version range, doesn't support parallel compaction of same family. + // EMPTY_CUMULATIVE is normalized to CUMULATIVE here so it conflicts with an in-flight + // CUMULATIVE on the same tablet (otherwise EMPTY_CUMULATIVE could advance + // cumulative_point past the in-flight cumu's input range and let base compaction race + // with it). for (auto& c : compactions) { - if (c.type() != compaction.type() && c.type() != TabletCompactionJobPB::FULL) + if (!is_same_conflict_family(c.type(), compaction.type()) && + c.type() != TabletCompactionJobPB::FULL) { continue; + } if (c.id() == compaction.id()) return; // Same job, return OK to keep idempotency msg = fmt::format("compaction has already started, tablet_id={} job={}", tablet_id, proto_to_json(c)); @@ -270,8 +325,14 @@ void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringst a.input_versions(1) < b.input_versions(0); }; for (auto& c : compactions) { - if (c.type() != compaction.type() && c.type() != TabletCompactionJobPB::FULL) - continue; + // Plan D: BASE and CUMULATIVE on the same tablet may also conflict when their + // input version ranges overlap. Previously we only checked same-type conflicts, + // which left a window where BASE could be accepted with versions that overlap an + // in-flight CUMULATIVE (and vice versa) after cumulative_point was unsafely + // advanced. Now we treat any pair within (BASE, CUMULATIVE, EMPTY_CUMULATIVE, + // FULL) as potentially conflicting and rely on the input-version-range check + // below to make the final decision. + if (!may_conflict_by_type(c.type(), compaction.type())) continue; if (c.input_versions_size() > 0 && version_not_conflict(c, compaction)) continue; if (c.id() == compaction.id()) return; // Same job, return OK to keep idempotency msg = fmt::format("compaction has already started, tablet_id={} job={}", tablet_id, @@ -279,14 +340,21 @@ void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringst code = MetaServiceCode::JOB_TABLET_BUSY; // Unknown version range of started compaction, BE should not retry other version range if (c.input_versions_size() == 0) return; - // Notify version ranges in started compaction to BE, so BE can retry other version range + // Notify version ranges of all in-flight compactions that may conflict with the + // incoming one, so BE can retry on a non-overlapping range. The notification + // predicate is intentionally kept consistent with the conflict predicate above + // (`may_conflict_by_type`); previously only same-family ranges were surfaced, + // which left BE blind to cross-family (BASE vs CUMULATIVE) conflicts. + // + // An in-flight EMPTY_CUMULATIVE (or any other family member without a concrete + // [v_lo, v_hi]) carries no usable range; surfacing fabricated zeros would + // mislead BE retry. Skip such entries defensively here - the real conflict is + // already enforced by the version-range check above. for (auto& c : compactions) { - if (c.type() == compaction.type() || c.type() == TabletCompactionJobPB::FULL) { - // If there are multiple started compaction of same type, they all must has input version range - DCHECK_EQ(c.input_versions_size(), 2) << proto_to_json(c); - response->add_version_in_compaction(c.input_versions(0)); - response->add_version_in_compaction(c.input_versions(1)); - } + if (!may_conflict_by_type(c.type(), compaction.type())) continue; + if (c.input_versions_size() != 2) continue; + response->add_version_in_compaction(c.input_versions(0)); + response->add_version_in_compaction(c.input_versions(1)); } return; } diff --git a/cloud/test/meta_service_job_test.cpp b/cloud/test/meta_service_job_test.cpp index 1926f6c600a54c..089418745b3c89 100644 --- a/cloud/test/meta_service_job_test.cpp +++ b/cloud/test/meta_service_job_test.cpp @@ -4610,6 +4610,221 @@ TEST(MetaServiceJobTest, ParallelCumuCompactionTest) { ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } +// Plan A regression test: EMPTY_CUMULATIVE must be considered the same conflict family as +// CUMULATIVE so that an EMPTY_CUMULATIVE submitted while a real CUMULATIVE is still active on the +// same tablet is rejected with JOB_TABLET_BUSY. Otherwise EMPTY_CUMULATIVE could advance +// cumulative_point past the in-flight cumu's input range and let base compaction race with it. +TEST(MetaServiceJobTest, EmptyCumulativeBlockedByCumulativeTest) { + auto meta_service = get_meta_service(); + + auto sp = SyncPoint::get_instance(); + DORIS_CLOUD_DEFER { + SyncPoint::get_instance()->clear_all_call_backs(); + }; + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = instance_id; + ret->second = true; + }); + sp->enable_processing(); + + constexpr int64_t table_id = 1; + constexpr int64_t index_id = 2; + constexpr int64_t partition_id = 3; + constexpr int64_t tablet_id = 4; + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id, false)); + + // Helper to start an EMPTY_CUMULATIVE job. EMPTY_CUMULATIVE has no input_versions and no + // expiration (only cumulative_point/cumulative_compaction_cnt are bumped), which lets it + // bypass `STALE_TABLET_CACHE` when both sides carry the same cumulative_compaction_cnt. + auto start_empty_cumu = [&](const std::string& job_id, const std::string& initiator, + int base_cnt, int cumu_cnt, StartTabletJobResponse& res) { + brpc::Controller cntl; + StartTabletJobRequest req; + req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id); + auto* compaction = req.mutable_job()->add_compaction(); + compaction->set_id(job_id); + compaction->set_initiator(initiator); + compaction->set_base_compaction_cnt(base_cnt); + compaction->set_cumulative_compaction_cnt(cumu_cnt); + compaction->set_type(TabletCompactionJobPB::EMPTY_CUMULATIVE); + long now = ::time(nullptr); + compaction->set_lease(now + 3); + meta_service->start_tablet_job(&cntl, &req, &res, nullptr); + }; + + // Step 1: An in-flight CUMULATIVE job [42326-42474] is registered first (mimics the + // scenario from the production log). + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "cumu1", "BE1", 0, 0, + TabletCompactionJobPB::CUMULATIVE, res, {42326, 42474}); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + + // Step 2: An EMPTY_CUMULATIVE arrives carrying the same cumulative_compaction_cnt as + // cumu1. Before the fix this was wrongly accepted because MS only compared raw enum types. + // After the fix, EMPTY_CUMULATIVE must be normalized to CUMULATIVE for conflict detection + // and rejected as JOB_TABLET_BUSY. + res.Clear(); + start_empty_cumu("empty1", "BE1", 0, 0, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) << res.status().msg(); + // EMPTY_CUMULATIVE has no input_versions, so BE must NOT receive any version range hint + // (the BE retry on `version_in_compaction` is meaningless for EMPTY_CUMULATIVE). + EXPECT_EQ(res.version_in_compaction_size(), 0); + + // Step 3: Idempotency check - the same job_id submitted twice should still return OK. + res.Clear(); + start_compaction_job(meta_service.get(), tablet_id, "cumu1", "BE1", 0, 0, + TabletCompactionJobPB::CUMULATIVE, res, {42326, 42474}); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + + // Step 4: A BASE compaction arrives via the same code path used by EMPTY_CUMULATIVE - + // i.e. without `input_versions`. Because is_same_conflict_family(BASE, CUMULATIVE) is + // false, BASE should still be accepted on this branch (the cross-family conflict is + // enforced only on the version-range branch validated by Plan D test below). + res.Clear(); + start_compaction_job(meta_service.get(), tablet_id, "base1", "BE1", 0, 0, + TabletCompactionJobPB::BASE, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + + // Step 5: A second EMPTY_CUMULATIVE should also be rejected by the now-active CUMULATIVE. + // (Even though job_pb already contains an EMPTY_CUMULATIVE-equivalent, the same-family + // check primarily catches the CUMULATIVE side here.) + res.Clear(); + start_empty_cumu("empty2", "BE2", 0, 0, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) << res.status().msg(); +} + +// Plan D regression test: when a CUMULATIVE compaction is already running with `input_versions` +// and `check_input_versions_range = true`, a BASE compaction whose version range overlaps with +// the in-flight CUMULATIVE must be rejected with JOB_TABLET_BUSY. Non-overlapping BASE jobs are +// still allowed, which is the typical safe case (BASE handles [0, cumu_point - 1] while +// CUMULATIVE handles versions above cumu_point). +TEST(MetaServiceJobTest, BaseCumulativeCrossTypeConflictTest) { + auto meta_service = get_meta_service(); + + auto sp = SyncPoint::get_instance(); + DORIS_CLOUD_DEFER { + SyncPoint::get_instance()->clear_all_call_backs(); + }; + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = instance_id; + ret->second = true; + }); + sp->enable_processing(); + + constexpr int64_t table_id = 1; + constexpr int64_t index_id = 2; + constexpr int64_t partition_id = 3; + constexpr int64_t tablet_id = 4; + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id, false)); + + // Local helper: start a BASE compaction request that carries `input_versions` (matching + // production BE behaviour: cloud_base_compaction.cpp always calls add_input_versions). + // Note: BASE does NOT call set_check_input_versions_range, so it's left as default false + // BUT input_versions is non-empty - this routes the request into the "has input_versions" + // branch on MS, which is the branch Plan D guards. + auto start_base = [&](const std::string& job_id, const std::string& initiator, int base_cnt, + int cumu_cnt, std::pair versions, + StartTabletJobResponse& res) { + brpc::Controller cntl; + StartTabletJobRequest req; + req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id); + auto* compaction = req.mutable_job()->add_compaction(); + compaction->set_id(job_id); + compaction->set_initiator(initiator); + compaction->set_base_compaction_cnt(base_cnt); + compaction->set_cumulative_compaction_cnt(cumu_cnt); + compaction->set_type(TabletCompactionJobPB::BASE); + long now = ::time(nullptr); + compaction->set_expiration(now + 12); + compaction->set_lease(now + 3); + compaction->add_input_versions(versions.first); + compaction->add_input_versions(versions.second); + // Intentionally NOT calling set_check_input_versions_range - BASE relies on the + // default false to mimic real BE behaviour. + meta_service->start_tablet_job(&cntl, &req, &res, nullptr); + }; + + // Step 1: A CUMULATIVE compaction with versions [10, 20] is started with parallel-cumu + // mode enabled (check_input_versions_range = true). This routes into the + // version-range-aware branch on MS. + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "cumu1", "BE1", 0, 0, + TabletCompactionJobPB::CUMULATIVE, res, {10, 20}); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + + // Step 2: BASE [5, 15] overlaps with CUMULATIVE [10, 20]. Plan D requires this to be + // rejected. Before the fix it would succeed (because the old `c.type() != compaction.type()` + // check skipped the active CUMULATIVE for a BASE submission). + res.Clear(); + start_base("base_overlap_left", "BE1", 0, 0, {5, 15}, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) << res.status().msg(); + + // Step 3: BASE [15, 25] also overlaps. Should be rejected. + res.Clear(); + start_base("base_overlap_right", "BE1", 0, 0, {15, 25}, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) << res.status().msg(); + + // Step 4: BASE [12, 18] is fully contained inside CUMULATIVE's range. Should be rejected. + res.Clear(); + start_base("base_overlap_inside", "BE1", 0, 0, {12, 18}, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) << res.status().msg(); + + // Step 5: BASE [5, 25] fully covers the CUMULATIVE range. Should be rejected. + res.Clear(); + start_base("base_overlap_cover", "BE1", 0, 0, {5, 25}, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) << res.status().msg(); + + // Step 6: BASE [0, 9] is BELOW the CUMULATIVE range. This is the typical safe case + // (base handles [0, cumu_point - 1]) and must still be accepted after Plan D. + res.Clear(); + start_base("base_safe_below", "BE1", 0, 0, {0, 9}, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + + // Step 7: A second BASE [21, 30] is ABOVE the CUMULATIVE range AND non-overlapping with the + // already-accepted base_safe_below [0, 9]. This is also a safe non-overlap case - although + // unusual in production (BASE rarely operates above cumu_point), MS should accept it. + res.Clear(); + start_base("base_safe_above", "BE2", 0, 0, {21, 30}, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + + // Step 8: A new CUMULATIVE [22, 28] overlaps with the just-accepted base_safe_above and + // must be rejected. Verifies the conflict is symmetric - CUMULATIVE submissions also + // see BASE jobs as conflicting. + res.Clear(); + start_compaction_job(meta_service.get(), tablet_id, "cumu_overlap_base", "BE1", 0, 0, + TabletCompactionJobPB::CUMULATIVE, res, {22, 28}); + ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) << res.status().msg(); + // The version_in_compaction notification predicate is kept consistent with the conflict + // predicate (`may_conflict_by_type`): every in-flight job in the rowset compaction family + // (BASE / CUMULATIVE) is surfaced so BE can pick a non-overlapping range to retry. + // Active jobs at this point: cumu1[10,20], base_safe_below[0,9], base_safe_above[21,30]. + // All three carry concrete input_versions so all three must be reported. + ASSERT_EQ(res.version_in_compaction_size(), 6); + EXPECT_EQ(res.version_in_compaction(0), 10); + EXPECT_EQ(res.version_in_compaction(1), 20); + EXPECT_EQ(res.version_in_compaction(2), 0); + EXPECT_EQ(res.version_in_compaction(3), 9); + EXPECT_EQ(res.version_in_compaction(4), 21); + EXPECT_EQ(res.version_in_compaction(5), 30); + + // Step 9: A new CUMULATIVE [30, 35] does not overlap with cumu1 [10, 20] but DOES overlap + // with base_safe_above [21, 30] (sharing version 30). Must be rejected. + res.Clear(); + start_compaction_job(meta_service.get(), tablet_id, "cumu_overlap_base2", "BE1", 0, 0, + TabletCompactionJobPB::CUMULATIVE, res, {30, 35}); + ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) << res.status().msg(); + + // Step 10: A new CUMULATIVE [31, 40] is fully above all active jobs and must be accepted. + res.Clear(); + start_compaction_job(meta_service.get(), tablet_id, "cumu_safe_above", "BE1", 0, 0, + TabletCompactionJobPB::CUMULATIVE, res, {31, 40}); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); +} + TEST(MetaServiceJobTest, SchemaChangeJobPersistTest) { auto meta_service = get_meta_service();