Skip to content

Commit e932486

Browse files
Add sorock-check (#463)
1 parent 1ccb725 commit e932486

File tree

18 files changed

+866
-5
lines changed

18 files changed

+866
-5
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
resolver = "2"
33
members = [
44
"sorock",
5+
"sorock-check",
56
"tests/*",
67
]
78

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ A Multi-Raft implementation in Rust language.
2525

2626
## Related Projects
2727

28-
- [sorock-monitor](https://github.com/akiradeveloper/sorock-monitor): Monitoring tool to watch the log state in a cluster. Implementing using [ratatui](https://github.com/ratatui/ratatui).
2928
- [phi-detector](https://github.com/akiradeveloper/phi-detector): Implementation of Phi Accrual Failure Detector in Rust.
3029

3130
## Author

sorock-check/Cargo.toml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "sorock-check"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
anyhow.workspace = true
8+
async-stream = "0.3.6"
9+
async-trait.workspace = true
10+
clap = { version = "4.5.20", features = ["derive"] }
11+
crossbeam = "0.8.4"
12+
futures.workspace = true
13+
prost.workspace = true
14+
rand.workspace = true
15+
ratatui = "0.28.1"
16+
spin.workspace = true
17+
tokio = { workspace = true, features = ["full"] }
18+
tonic.workspace = true
19+
tonic-prost.workspace = true
20+
tui-widget-list = "0.12.2"
21+
22+
[build-dependencies]
23+
tonic-prost-build.workspace = true

sorock-check/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# sorock-check
2+
3+
A lightweight tool to troubleshoot Raft clusters by visualizing the cluster and the log progress.
4+
5+
https://github.com/user-attachments/assets/9aff6794-778b-48fa-bfbd-838e63b3e5c8
6+
7+
## Usage
8+
9+
`sorock-check connect $URL $SHARD_ID`. (e.g. `sorock-check connect http://node5:50051 34`)
10+
11+
Once connected to any node in a cluster,
12+
the program will automatically connect to all nodes in the cluster.

sorock-check/build.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
fn main() {
2+
tonic_prost_build::configure()
3+
.compile_protos(&["sorock.proto"], &["../sorock/proto"])
4+
.unwrap();
5+
}

sorock-check/src/main.rs

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
use anyhow::Result;
2+
use spin::RwLock;
3+
use std::collections::{BTreeMap, HashMap, HashSet};
4+
use std::sync::Arc;
5+
use std::{io, vec};
6+
7+
use clap::Parser;
8+
use futures::Stream;
9+
use futures::StreamExt;
10+
use ratatui::prelude::*;
11+
use ratatui::widgets::{Block, Gauge};
12+
use ratatui::{
13+
crossterm::event::{self, KeyCode, KeyEventKind},
14+
widgets::{Axis, Borders, Chart, Dataset, GraphType, StatefulWidget, Widget},
15+
DefaultTerminal,
16+
};
17+
use std::pin::Pin;
18+
use std::time::{Duration, Instant};
19+
use tonic::transport::{Channel, Endpoint, Uri};
20+
21+
mod mock;
22+
mod model;
23+
mod real;
24+
mod ui;
25+
26+
mod proto {
27+
tonic::include_proto!("sorock");
28+
}
29+
30+
#[derive(Parser)]
31+
enum Sub {
32+
#[clap(about = "Start monitoring a cluster by connecting to a node.")]
33+
Monitor { addr: Uri, shard_id: u32 },
34+
#[clap(about = "Embedded test. 0 -> Static data, 1 -> Mock servers")]
35+
TestMonitor { number: u8 },
36+
}
37+
38+
#[derive(Parser)]
39+
struct Args {
40+
#[clap(subcommand)]
41+
sub: Sub,
42+
}
43+
44+
#[tokio::main]
45+
async fn main() -> Result<()> {
46+
let args = Args::parse();
47+
48+
let model = match args.sub {
49+
Sub::Monitor { addr, shard_id } => {
50+
let node = real::connect_real_node(addr, shard_id);
51+
model::Model::new(node).await
52+
}
53+
Sub::TestMonitor { number: 0 } => model::Model::test(),
54+
Sub::TestMonitor { number: 1 } => {
55+
let mock = mock::connect_mock_node();
56+
model::Model::new(mock).await
57+
}
58+
_ => unreachable!(),
59+
};
60+
61+
let mut terminal = ratatui::init();
62+
let app_result = App::new(model).run(&mut terminal)?;
63+
terminal.clear()?;
64+
ratatui::restore();
65+
66+
Ok(app_result)
67+
}
68+
69+
struct App {
70+
model: model::Model,
71+
}
72+
impl App {
73+
pub fn new(model: model::Model) -> Self {
74+
Self { model }
75+
}
76+
77+
fn run(self, terminal: &mut DefaultTerminal) -> io::Result<()> {
78+
let mut app_state = AppState::default();
79+
loop {
80+
terminal.draw(|frame| {
81+
frame.render_stateful_widget(&self, frame.area(), &mut app_state);
82+
})?;
83+
84+
if !event::poll(Duration::from_millis(100))? {
85+
continue;
86+
}
87+
88+
if let event::Event::Key(key) = event::read()? {
89+
if key.kind == KeyEventKind::Press {
90+
match key.code {
91+
KeyCode::Char('q') => return Ok(()),
92+
KeyCode::Up | KeyCode::Char('k') => app_state.list_state.previous(),
93+
KeyCode::Down | KeyCode::Char('j') => app_state.list_state.next(),
94+
_ => {}
95+
}
96+
}
97+
}
98+
}
99+
}
100+
}
101+
102+
#[derive(Default)]
103+
struct AppState {
104+
list_state: tui_widget_list::ListState,
105+
}
106+
impl StatefulWidget for &App {
107+
type State = AppState;
108+
fn render(
109+
self,
110+
area: ratatui::prelude::Rect,
111+
buf: &mut ratatui::prelude::Buffer,
112+
state: &mut Self::State,
113+
) where
114+
Self: Sized,
115+
{
116+
let chunks = Layout::default()
117+
.direction(Direction::Vertical)
118+
.margin(1)
119+
.constraints([Constraint::Length(15), Constraint::Fill(1)].as_ref())
120+
.split(area);
121+
122+
let progress_chart = {
123+
let end = Instant::now();
124+
let start = end - Duration::from_secs(120);
125+
let data = self.model.progress_log.read().get_range(start, end);
126+
ui::progress_chart::ProgressChart::new(data, start, end)
127+
};
128+
Widget::render(progress_chart, chunks[0], buf);
129+
130+
let nodes_list = {
131+
let mut nodes = vec![];
132+
let reader = &self.model.nodes.read();
133+
134+
let min_index = reader
135+
.nodes
136+
.values()
137+
.map(|node_state| node_state.log_state.head_index)
138+
.min()
139+
.unwrap_or(0);
140+
let max_index = reader
141+
.nodes
142+
.values()
143+
.map(|node_state| node_state.log_state.last_index)
144+
.max()
145+
.unwrap_or(0);
146+
147+
for (uri, node_state) in &reader.nodes {
148+
let log_state = &node_state.log_state;
149+
nodes.push(ui::node_list::Node {
150+
name: uri.to_string(),
151+
head_index: log_state.head_index,
152+
snapshot_index: log_state.snapshot_index,
153+
app_index: log_state.app_index,
154+
commit_index: log_state.commit_index,
155+
last_index: log_state.last_index,
156+
min_max: ui::node_list::IndexRange {
157+
min_index,
158+
max_index,
159+
},
160+
});
161+
}
162+
nodes.sort_by_key(|node| node.name.clone());
163+
ui::node_list::NodeList::new(nodes)
164+
};
165+
StatefulWidget::render(nodes_list, chunks[1], buf, &mut state.list_state);
166+
}
167+
}

sorock-check/src/mock.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use super::*;
2+
3+
use futures::stream::Stream;
4+
use std::time::Instant;
5+
use tonic::transport::Uri;
6+
7+
pub struct MockNode {
8+
start_time: Instant,
9+
}
10+
impl MockNode {
11+
pub fn new() -> Self {
12+
Self {
13+
start_time: Instant::now(),
14+
}
15+
}
16+
}
17+
18+
#[async_trait::async_trait]
19+
impl model::stream::Node for MockNode {
20+
async fn watch_membership(&self) -> Pin<Box<dyn Stream<Item = proto::Membership> + Send>> {
21+
let out = proto::Membership {
22+
members: vec![
23+
"http://n1:4000".to_string(),
24+
"http://n2:4000".to_string(),
25+
"http://n3:4000".to_string(),
26+
"http://n4:4000".to_string(),
27+
"http://n5:4000".to_string(),
28+
"http://n6:4000".to_string(),
29+
"http://n7:4000".to_string(),
30+
"http://n8:4000".to_string(),
31+
],
32+
};
33+
Box::pin(futures::stream::once(async move { out }))
34+
}
35+
36+
async fn watch_log_metrics(
37+
&self,
38+
_: Uri,
39+
) -> Pin<Box<dyn Stream<Item = proto::LogMetrics> + Send>> {
40+
let start_time = self.start_time;
41+
let st = async_stream::stream! {
42+
loop {
43+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
44+
let x = Instant::now().duration_since(start_time).as_secs();
45+
// f(x) = x^2 * log(x)
46+
// is a monotonically increasing function
47+
let f = |x: u64| {
48+
let a = f64::powf(x as f64, 2.);
49+
let b = f64::log(10.0, x as f64);
50+
(a * b) as u64
51+
};
52+
let metrics = proto::LogMetrics {
53+
head_index: f(x),
54+
snap_index: f(x+1),
55+
app_index: f(x+2),
56+
commit_index: f(x+3),
57+
last_index: f(x+4),
58+
};
59+
yield metrics
60+
}
61+
};
62+
Box::pin(st)
63+
}
64+
}
65+
66+
pub fn connect_mock_node() -> impl model::stream::Node {
67+
let app = MockNode::new();
68+
app
69+
}

sorock-check/src/model/mod.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use super::*;
2+
3+
mod nodes;
4+
mod progress_log;
5+
pub mod stream;
6+
pub use nodes::*;
7+
pub use progress_log::*;
8+
9+
pub struct Model {
10+
pub nodes: Arc<RwLock<Nodes>>,
11+
pub progress_log: Arc<RwLock<ProgressLog>>,
12+
}
13+
impl Model {
14+
pub async fn new(node: impl stream::Node + 'static) -> Self {
15+
let node = Arc::new(node);
16+
let nodes = Arc::new(RwLock::new(Nodes::default()));
17+
let progress_log = Arc::new(RwLock::new(ProgressLog::new()));
18+
19+
tokio::spawn({
20+
let node = node.watch_membership().await;
21+
let nodes = nodes.clone();
22+
async move {
23+
stream::CopyMembership::copy(node, nodes).await;
24+
}
25+
});
26+
27+
tokio::spawn({
28+
let node = node.clone();
29+
let nodes = nodes.clone();
30+
async move {
31+
loop {
32+
tokio::time::sleep(Duration::from_secs(1)).await;
33+
nodes::start_copying(node.clone(), nodes.clone());
34+
}
35+
}
36+
});
37+
38+
tokio::spawn({
39+
let nodes = nodes.clone();
40+
let progress_log = progress_log.clone();
41+
async move {
42+
loop {
43+
progress_log::copy(nodes.clone(), progress_log.clone());
44+
tokio::time::sleep(Duration::from_secs(1)).await;
45+
}
46+
}
47+
});
48+
49+
Self {
50+
nodes,
51+
progress_log,
52+
}
53+
}
54+
55+
pub fn test() -> Self {
56+
Self {
57+
nodes: Arc::new(RwLock::new(Nodes::test())),
58+
progress_log: Arc::new(RwLock::new(ProgressLog::test())),
59+
}
60+
}
61+
}

0 commit comments

Comments
 (0)