Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sorock/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ fn main() {
.bytes(".sorock.SnapshotChunk.data")
.file_descriptor_set_path(out_dir.join("sorock_descriptor.bin"))
.compile_protos(
&["proto/sorock.proto", "proto-ext/sorock_monitor.proto"],
&["proto", "proto-ext"],
&["proto/sorock.proto"],
&["proto"],
)
.unwrap();
}
26 changes: 0 additions & 26 deletions sorock/proto-ext/sorock_monitor.proto

This file was deleted.

18 changes: 18 additions & 0 deletions sorock/proto/sorock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -102,22 +102,40 @@ message RemoveServerRequest {
string server_id = 2;
}

message Shard {
uint32 id = 1;
}

message Membership {
repeated string members = 1;
}

// On receiving this request, a server starts a new election
// to become a leader disregarding the election timeout.
// You can use this request to rebalance the leaders in the cluster.
message TimeoutNow {
uint32 shard_id = 1;
}

message LogMetrics {
uint64 head_index = 1;
uint64 snap_index = 2;
uint64 app_index = 3;
uint64 commit_index = 4;
uint64 last_index = 5;
}

service Raft {
rpc Write(WriteRequest) returns (Response);
rpc Read(ReadRequest) returns (Response);
rpc ProcessKernRequest (KernRequest) returns (google.protobuf.Empty);
rpc RequestVote (VoteRequest) returns (VoteResponse);
rpc AddServer (AddServerRequest) returns (google.protobuf.Empty);
rpc RemoveServer (RemoveServerRequest) returns (google.protobuf.Empty);
rpc GetMembership(Shard) returns (Membership) {}
rpc SendReplicationStream (stream ReplicationStreamChunk) returns (ReplicationStreamResponse);
rpc GetSnapshot (GetSnapshotRequest) returns (stream SnapshotChunk);
rpc SendHeartbeat (Heartbeat) returns (google.protobuf.Empty);
rpc SendTimeoutNow (TimeoutNow) returns (google.protobuf.Empty);
rpc WatchLogMetrics(Shard) returns (stream LogMetrics) {}
}
1 change: 0 additions & 1 deletion sorock/src/service/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::*;

pub mod monitor;
pub mod raft;
pub mod reflection;
68 changes: 0 additions & 68 deletions sorock/src/service/monitor.rs

This file was deleted.

50 changes: 50 additions & 0 deletions sorock/src/service/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod raft {

use process::*;
use raft::raft_server::{Raft, RaftServer};
use std::pin::Pin;

pub mod client;
pub(crate) mod communicator;
Expand Down Expand Up @@ -158,6 +159,23 @@ impl raft::raft_server::Raft for RaftService {
Ok(tonic::Response::new(()))
}

async fn get_membership(
&self,
req: tonic::Request<raft::Shard>,
) -> std::result::Result<tonic::Response<raft::Membership>, tonic::Status> {
let shard_id = req.into_inner().id;
let process = self
.node
.get_process(shard_id)
.context(Error::ProcessNotFound(shard_id))
.unwrap();
let members = process.get_membership().await.unwrap().members;
let out = raft::Membership {
members: members.into_iter().map(|x| x.to_string()).collect(),
};
Ok(tonic::Response::new(out))
}

async fn send_replication_stream(
&self,
request: tonic::Request<tonic::Streaming<raft::ReplicationStreamChunk>>,
Expand Down Expand Up @@ -233,4 +251,36 @@ impl raft::raft_server::Raft for RaftService {
.unwrap();
Ok(tonic::Response::new(()))
}

type WatchLogMetricsStream =
Pin<Box<dyn Stream<Item = Result<raft::LogMetrics, tonic::Status>> + Send>>;

async fn watch_log_metrics(
&self,
req: tonic::Request<raft::Shard>,
) -> std::result::Result<tonic::Response<Self::WatchLogMetricsStream>, tonic::Status> {
let shard_id = req.into_inner().id;
let node = self.node.clone();
let st = async_stream::try_stream! {
let mut intvl = tokio::time::interval(std::time::Duration::from_secs(1));
loop {
intvl.tick().await;

let process = node
.get_process(shard_id)
.context(Error::ProcessNotFound(shard_id))
.unwrap();
let log_state = process.get_log_state().await.unwrap();
let metrics = raft::LogMetrics {
head_index: log_state.head_index,
snap_index: log_state.snap_index,
app_index: log_state.app_index,
commit_index: log_state.commit_index,
last_index: log_state.last_index,
};
yield metrics
}
};
Ok(tonic::Response::new(Box::pin(st)))
}
}
2 changes: 0 additions & 2 deletions tests/env/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ impl Node {
let raft_svc = sorock::service::raft::new(node.clone())
.send_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Zstd);
let monitor_svc = sorock::service::monitor::new(node);
let reflection_svc = sorock::service::reflection::new();
let ping_svc = testapp::ping_app::new_service();

Expand All @@ -75,7 +74,6 @@ impl Node {
let mut builder = tonic::transport::Server::builder();
builder
.add_service(raft_svc)
.add_service(monitor_svc)
.add_service(reflection_svc)
.add_service(ping_svc)
.serve_with_shutdown(socket, async {
Expand Down
Loading