Skip to content

Commit 00a21fd

Browse files
optimize: Use lockfree queue in heartbeat queue
1 parent 22f1fac commit 00a21fd

File tree

3 files changed

+17
-23
lines changed

3 files changed

+17
-23
lines changed

sorock/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ derive_more = { version = "1", features = ["full"] }
2020
flume = "0.11.0"
2121
futures.workspace = true
2222
http-serde = "2"
23+
lockfree = "0.5.1"
2324
moka = { version = "0.12", features = ["sync"] }
2425
oneshot = "0.1.7"
2526
phi-detector = "0.4"
@@ -43,4 +44,4 @@ tokio = { version = "1", features = ["full"] }
4344
[build-dependencies]
4445
prost-build.workspace = true
4546
protox.workspace = true
46-
tonic-build.workspace = true
47+
tonic-build.workspace = true

sorock/src/service/raft/communicator/heartbeat_multiplex.rs

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,35 @@
11
use super::*;
22

3-
use spin::Mutex;
43
use std::collections::HashMap;
54

65
pub struct HeartbeatBuffer {
7-
buf: HashMap<ShardId, request::Heartbeat>,
6+
buf: lockfree::queue::Queue<(ShardId, request::Heartbeat)>,
87
}
98
impl HeartbeatBuffer {
109
pub fn new() -> Self {
1110
Self {
12-
buf: HashMap::new(),
11+
buf: lockfree::queue::Queue::new(),
1312
}
1413
}
1514

16-
pub fn push(&mut self, shard_id: ShardId, req: request::Heartbeat) {
17-
self.buf.insert(shard_id, req);
15+
pub fn push(&self, shard_id: ShardId, req: request::Heartbeat) {
16+
self.buf.push((shard_id, req));
1817
}
1918

20-
fn drain(&mut self) -> HashMap<ShardId, request::Heartbeat> {
21-
self.buf.drain().collect()
19+
fn drain(&self) -> HashMap<ShardId, request::Heartbeat> {
20+
let mut out = HashMap::new();
21+
for (k, v) in self.buf.pop_iter() {
22+
out.insert(k, v);
23+
}
24+
out
2225
}
2326
}
2427

25-
pub async fn run(
26-
buf: Arc<Mutex<HeartbeatBuffer>>,
27-
mut cli: raft::RaftClient,
28-
self_node_id: NodeId,
29-
) {
28+
pub async fn run(buf: Arc<HeartbeatBuffer>, mut cli: raft::RaftClient, self_node_id: NodeId) {
3029
loop {
3130
tokio::time::sleep(Duration::from_millis(300)).await;
3231

33-
let heartbeats = {
34-
let mut buf = buf.lock();
35-
let out = buf.drain();
36-
out
37-
};
32+
let heartbeats = buf.drain();
3833

3934
let states = {
4035
let mut out = HashMap::new();

sorock/src/service/raft/communicator/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ mod heartbeat_multiplex;
44
mod stream;
55

66
use heartbeat_multiplex::*;
7-
use process::*;
8-
use spin::Mutex;
97
use std::sync::Arc;
108
use tokio::task::AbortHandle;
119

@@ -19,7 +17,7 @@ impl Drop for HandleDrop {
1917
#[derive(Clone)]
2018
pub struct RaftConnection {
2119
client: raft::RaftClient,
22-
heartbeat_buffer: Arc<Mutex<HeartbeatBuffer>>,
20+
heartbeat_buffer: Arc<HeartbeatBuffer>,
2321
_abort_hdl: Arc<HandleDrop>,
2422
}
2523
impl RaftConnection {
@@ -35,7 +33,7 @@ impl RaftConnection {
3533
raft::RaftClient::new(chan)
3634
};
3735

38-
let heartbeat_buffer = Arc::new(Mutex::new(HeartbeatBuffer::new()));
36+
let heartbeat_buffer = Arc::new(HeartbeatBuffer::new());
3937

4038
let fut = heartbeat_multiplex::run(heartbeat_buffer.clone(), client.clone(), self_node_id);
4139
let fut = tokio::task::unconstrained(fut);
@@ -77,7 +75,7 @@ impl Communicator {
7775
}
7876

7977
pub fn queue_heartbeat(&self, req: request::Heartbeat) {
80-
self.conn.heartbeat_buffer.lock().push(self.shard_id, req);
78+
self.conn.heartbeat_buffer.push(self.shard_id, req);
8179
}
8280

8381
pub async fn process_user_write_request(

0 commit comments

Comments
 (0)