diff --git a/sorock/proto/sorock.proto b/sorock/proto/sorock.proto index e42daf5d..074ce8cc 100644 --- a/sorock/proto/sorock.proto +++ b/sorock/proto/sorock.proto @@ -20,6 +20,7 @@ message WriteRequest { message ReadRequest { uint32 shard_id = 1; bytes message = 2; + bool read_locally = 3; } // Response from the `RaftApp`. diff --git a/sorock/src/communicator/mod.rs b/sorock/src/communicator/mod.rs index cce561c3..33a56f2e 100644 --- a/sorock/src/communicator/mod.rs +++ b/sorock/src/communicator/mod.rs @@ -97,6 +97,7 @@ impl Communicator { let req = raft::ReadRequest { shard_id: self.shard_id, message: req.message, + read_locally: req.read_locally, }; let resp = self.conn.client.clone().read(req).await?.into_inner(); Ok(resp.message) diff --git a/sorock/src/generated/sorock.rs b/sorock/src/generated/sorock.rs index dd764ef3..dd2d3905 100644 --- a/sorock/src/generated/sorock.rs +++ b/sorock/src/generated/sorock.rs @@ -21,6 +21,8 @@ pub struct ReadRequest { pub shard_id: u32, #[prost(bytes = "bytes", tag = "2")] pub message: ::prost::bytes::Bytes, + #[prost(bool, tag = "3")] + pub read_locally: bool, } /// Response from the `RaftApp`. #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/sorock/src/generated/sorock_descriptor.bin b/sorock/src/generated/sorock_descriptor.bin index c1225074..7756d134 100644 Binary files a/sorock/src/generated/sorock_descriptor.bin and b/sorock/src/generated/sorock_descriptor.bin differ diff --git a/sorock/src/process/api.rs b/sorock/src/process/api.rs index 0ef06175..4f80eb51 100644 --- a/sorock/src/process/api.rs +++ b/sorock/src/process/api.rs @@ -10,6 +10,7 @@ pub mod request { pub struct UserReadRequest { pub message: Bytes, + pub read_locally: bool, } pub struct KernRequest { diff --git a/sorock/src/process/raft_process/responder.rs b/sorock/src/process/raft_process/responder.rs index ade2134c..a5ddf1fb 100644 --- a/sorock/src/process/raft_process/responder.rs +++ b/sorock/src/process/raft_process/responder.rs @@ -62,10 +62,13 @@ impl RaftProcess { anyhow::bail!(Error::LeaderUnknown) }; - let resp = if std::matches!( - self.voter.read_election_state(), - voter::ElectionState::Leader - ) { + let will_process = req.read_locally + || std::matches!( + self.voter.read_election_state(), + voter::ElectionState::Leader + ); + + let resp = if will_process { let (user_completion, rx) = completion::prepare_user_completion(); let read_index = self.command_log.commit_pointer.load(Ordering::SeqCst); diff --git a/sorock/src/raft_service/mod.rs b/sorock/src/raft_service/mod.rs index 0440cd8d..f4c05853 100644 --- a/sorock/src/raft_service/mod.rs +++ b/sorock/src/raft_service/mod.rs @@ -49,6 +49,7 @@ impl raft::raft_server::Raft for RaftService { let shard_id = req.shard_id; let req = request::UserReadRequest { message: req.message, + read_locally: req.read_locally, }; let resp = self .node diff --git a/tests/testapp/src/lib.rs b/tests/testapp/src/lib.rs index a8435616..babf95d7 100644 --- a/tests/testapp/src/lib.rs +++ b/tests/testapp/src/lib.rs @@ -91,6 +91,7 @@ impl Client { let req = ReadRequest { shard_id, message: AppReadRequest::Read.serialize(), + read_locally: false, }; let resp = self.cli.clone().read(req).await?.into_inner(); let resp = AppState::deserialize(&resp.message); @@ -101,6 +102,7 @@ impl Client { let req = ReadRequest { shard_id, message: AppReadRequest::MakeSnapshot.serialize(), + read_locally: true, }; let resp = self.cli.clone().read(req).await?.into_inner(); let resp = AppState::deserialize(&resp.message);