Skip to content

Commit abc327f

Browse files
authored
Merge pull request #391 from superfly/disable-assertion
Disable antithesis assertion for number of changes
2 parents a8eda4a + 784c993 commit abc327f

File tree

2 files changed

+24
-28
lines changed

2 files changed

+24
-28
lines changed

crates/corro-agent/src/agent/util.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,12 +1276,14 @@ pub fn process_complete_version<T: Deref<Target = rusqlite::Connection> + Commit
12761276

12771277
debug!(%actor_id, %version, "complete change, applying right away! seqs: {seqs:?}, last_seq: {last_seq}, changes len: {len}, db version: {version}");
12781278

1279-
let details = json!({"len": len, "seqs": seqs.start_int(), "seqs_end": seqs.end_int(), "actor_id": actor_id, "version": version});
1280-
assert_always!(
1281-
len <= seqs.len(),
1282-
"number of changes is equal to the seq len",
1283-
&details
1284-
);
1279+
// TODO: Figure out a better assertion. This assertion is disabled for now to reduce false negatives. We can receive a valid complete changeset
1280+
// where the number of changes is less than the seqs range because some rows have been overridden by a newer update.
1281+
// let details = json!({"len": len, "seqs": seqs.start_int(), "seqs_end": seqs.end_int(), "actor_id": actor_id, "version": version});
1282+
// assert_always!(
1283+
// len <= seqs.len(),
1284+
// "number of changes is equal to the seq len",
1285+
// &details
1286+
// );
12851287
debug_assert!(len <= seqs.len(), "change from actor {actor_id} version {version} has len {len} but seqs range is {seqs:?} and last_seq is {last_seq}");
12861288

12871289
// Insert all the changes in a single statement

crates/corro-agent/src/broadcast/mod.rs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -723,8 +723,6 @@ async fn handle_broadcasts(
723723
)
724724
};
725725

726-
let pending_sent_instance = pending.sent_to.len();
727-
728726
let mut spawn_count = 0;
729727
trace!("broadcasting to: {:?}", broadcast_to);
730728
for addr in broadcast_to {
@@ -764,26 +762,22 @@ async fn handle_broadcasts(
764762

765763
counter!("corro.broadcast.spawn", "type" => "global").increment(spawn_count);
766764

767-
if pending_sent_instance != pending.sent_to.len() {
768-
// we've sent this to at least 1 member...
769-
770-
if let Some(send_count) = pending.send_count.checked_add(1) {
771-
trace!("send_count: {send_count}, max_transmissions: {max_transmissions}");
772-
pending.send_count = send_count;
773-
774-
if send_count < max_transmissions {
775-
debug!("queueing for re-send");
776-
idle_pendings.push(Box::pin(async move {
777-
// slow our send pace if we've been previously rate limited
778-
let sleep_ms_base = if prev_rate_limited { 500 } else { 100 };
779-
// send with increasing latency as we've already sent the updates out
780-
tokio::time::sleep(Duration::from_millis(
781-
sleep_ms_base * send_count as u64,
782-
))
783-
.await;
784-
pending
785-
}));
786-
}
765+
if let Some(send_count) = pending.send_count.checked_add(1) {
766+
trace!("send_count: {send_count}, max_transmissions: {max_transmissions}");
767+
pending.send_count = send_count;
768+
769+
if send_count < max_transmissions {
770+
debug!("queueing for re-send");
771+
idle_pendings.push(Box::pin(async move {
772+
// slow our send pace if we've been previously rate limited
773+
let sleep_ms_base = if prev_rate_limited { 500 } else { 100 };
774+
// send with increasing latency as we've already sent the updates out
775+
tokio::time::sleep(Duration::from_millis(
776+
sleep_ms_base * send_count as u64,
777+
))
778+
.await;
779+
pending
780+
}));
787781
}
788782
}
789783
}

0 commit comments

Comments
 (0)