Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
resolver = "2"
members = [
"sorock",
"sorock-check",
"tests/*",
]

Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ A Multi-Raft implementation in Rust language.

## Related Projects

- [sorock-monitor](https://github.com/akiradeveloper/sorock-monitor): Monitoring tool to watch the log state in a cluster. Implementing using [ratatui](https://github.com/ratatui/ratatui).
- [phi-detector](https://github.com/akiradeveloper/phi-detector): Implementation of Phi Accrual Failure Detector in Rust.

## Author
Expand Down
23 changes: 23 additions & 0 deletions sorock-check/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "sorock-check"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow.workspace = true
async-stream = "0.3.6"
async-trait.workspace = true
clap = { version = "4.5.20", features = ["derive"] }
crossbeam = "0.8.4"
futures.workspace = true
prost.workspace = true
rand.workspace = true
ratatui = "0.28.1"
spin.workspace = true
tokio = { workspace = true, features = ["full"] }
tonic.workspace = true
tonic-prost.workspace = true
tui-widget-list = "0.12.2"

[build-dependencies]
tonic-prost-build.workspace = true
12 changes: 12 additions & 0 deletions sorock-check/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# sorock-check

A lightweight tool to troubleshoot Raft clusters by visualizing the cluster and the log progress.

https://github.com/user-attachments/assets/9aff6794-778b-48fa-bfbd-838e63b3e5c8

## Usage

`sorock-check connect $URL $SHARD_ID`. (e.g. `sorock-check connect http://node5:50051 34`)

Once connected to any node in a cluster,
the program will automatically connect to all nodes in the cluster.
5 changes: 5 additions & 0 deletions sorock-check/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fn main() {
tonic_prost_build::configure()
.compile_protos(&["sorock.proto"], &["../sorock/proto"])
.unwrap();
}
167 changes: 167 additions & 0 deletions sorock-check/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use anyhow::Result;
use spin::RwLock;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::{io, vec};

use clap::Parser;
use futures::Stream;
use futures::StreamExt;
use ratatui::prelude::*;
use ratatui::widgets::{Block, Gauge};
use ratatui::{
crossterm::event::{self, KeyCode, KeyEventKind},
widgets::{Axis, Borders, Chart, Dataset, GraphType, StatefulWidget, Widget},
DefaultTerminal,
};
use std::pin::Pin;
use std::time::{Duration, Instant};
use tonic::transport::{Channel, Endpoint, Uri};

mod mock;
mod model;
mod real;
mod ui;

mod proto {
tonic::include_proto!("sorock");
}

#[derive(Parser)]
enum Sub {
#[clap(about = "Start monitoring a cluster by connecting to a node.")]
Monitor { addr: Uri, shard_id: u32 },
#[clap(about = "Embedded test. 0 -> Static data, 1 -> Mock servers")]
TestMonitor { number: u8 },
}

#[derive(Parser)]
struct Args {
#[clap(subcommand)]
sub: Sub,
}

#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();

let model = match args.sub {
Sub::Monitor { addr, shard_id } => {
let node = real::connect_real_node(addr, shard_id);
model::Model::new(node).await
}
Sub::TestMonitor { number: 0 } => model::Model::test(),
Sub::TestMonitor { number: 1 } => {
let mock = mock::connect_mock_node();
model::Model::new(mock).await
}
_ => unreachable!(),
};

let mut terminal = ratatui::init();
let app_result = App::new(model).run(&mut terminal)?;
terminal.clear()?;
ratatui::restore();

Ok(app_result)
}

struct App {
model: model::Model,
}
impl App {
pub fn new(model: model::Model) -> Self {
Self { model }
}

fn run(self, terminal: &mut DefaultTerminal) -> io::Result<()> {
let mut app_state = AppState::default();
loop {
terminal.draw(|frame| {
frame.render_stateful_widget(&self, frame.area(), &mut app_state);
})?;

if !event::poll(Duration::from_millis(100))? {
continue;
}

if let event::Event::Key(key) = event::read()? {
if key.kind == KeyEventKind::Press {
match key.code {
KeyCode::Char('q') => return Ok(()),
KeyCode::Up | KeyCode::Char('k') => app_state.list_state.previous(),
KeyCode::Down | KeyCode::Char('j') => app_state.list_state.next(),
_ => {}
}
}
}
}
}
}

#[derive(Default)]
struct AppState {
list_state: tui_widget_list::ListState,
}
impl StatefulWidget for &App {
type State = AppState;
fn render(
self,
area: ratatui::prelude::Rect,
buf: &mut ratatui::prelude::Buffer,
state: &mut Self::State,
) where
Self: Sized,
{
let chunks = Layout::default()
.direction(Direction::Vertical)
.margin(1)
.constraints([Constraint::Length(15), Constraint::Fill(1)].as_ref())
.split(area);

let progress_chart = {
let end = Instant::now();
let start = end - Duration::from_secs(120);
let data = self.model.progress_log.read().get_range(start, end);
ui::progress_chart::ProgressChart::new(data, start, end)
};
Widget::render(progress_chart, chunks[0], buf);

let nodes_list = {
let mut nodes = vec![];
let reader = &self.model.nodes.read();

let min_index = reader
.nodes
.values()
.map(|node_state| node_state.log_state.head_index)
.min()
.unwrap_or(0);
let max_index = reader
.nodes
.values()
.map(|node_state| node_state.log_state.last_index)
.max()
.unwrap_or(0);

for (uri, node_state) in &reader.nodes {
let log_state = &node_state.log_state;
nodes.push(ui::node_list::Node {
name: uri.to_string(),
head_index: log_state.head_index,
snapshot_index: log_state.snapshot_index,
app_index: log_state.app_index,
commit_index: log_state.commit_index,
last_index: log_state.last_index,
min_max: ui::node_list::IndexRange {
min_index,
max_index,
},
});
}
nodes.sort_by_key(|node| node.name.clone());
ui::node_list::NodeList::new(nodes)
};
StatefulWidget::render(nodes_list, chunks[1], buf, &mut state.list_state);
}
}
69 changes: 69 additions & 0 deletions sorock-check/src/mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use super::*;

use futures::stream::Stream;
use std::time::Instant;
use tonic::transport::Uri;

pub struct MockNode {
start_time: Instant,
}
impl MockNode {
pub fn new() -> Self {
Self {
start_time: Instant::now(),
}
}
}

#[async_trait::async_trait]
impl model::stream::Node for MockNode {
async fn watch_membership(&self) -> Pin<Box<dyn Stream<Item = proto::Membership> + Send>> {
let out = proto::Membership {
members: vec![
"http://n1:4000".to_string(),
"http://n2:4000".to_string(),
"http://n3:4000".to_string(),
"http://n4:4000".to_string(),
"http://n5:4000".to_string(),
"http://n6:4000".to_string(),
"http://n7:4000".to_string(),
"http://n8:4000".to_string(),
],
};
Box::pin(futures::stream::once(async move { out }))
}

async fn watch_log_metrics(
&self,
_: Uri,
) -> Pin<Box<dyn Stream<Item = proto::LogMetrics> + Send>> {
let start_time = self.start_time;
let st = async_stream::stream! {
loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let x = Instant::now().duration_since(start_time).as_secs();
// f(x) = x^2 * log(x)
// is a monotonically increasing function
let f = |x: u64| {
let a = f64::powf(x as f64, 2.);
let b = f64::log(10.0, x as f64);
(a * b) as u64
};
let metrics = proto::LogMetrics {
head_index: f(x),
snap_index: f(x+1),
app_index: f(x+2),
commit_index: f(x+3),
last_index: f(x+4),
};
yield metrics
}
};
Box::pin(st)
}
}

pub fn connect_mock_node() -> impl model::stream::Node {
let app = MockNode::new();
app
}
61 changes: 61 additions & 0 deletions sorock-check/src/model/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use super::*;

mod nodes;
mod progress_log;
pub mod stream;
pub use nodes::*;
pub use progress_log::*;

pub struct Model {
pub nodes: Arc<RwLock<Nodes>>,
pub progress_log: Arc<RwLock<ProgressLog>>,
}
impl Model {
pub async fn new(node: impl stream::Node + 'static) -> Self {
let node = Arc::new(node);
let nodes = Arc::new(RwLock::new(Nodes::default()));
let progress_log = Arc::new(RwLock::new(ProgressLog::new()));

tokio::spawn({
let node = node.watch_membership().await;
let nodes = nodes.clone();
async move {
stream::CopyMembership::copy(node, nodes).await;
}
});

tokio::spawn({
let node = node.clone();
let nodes = nodes.clone();
async move {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
nodes::start_copying(node.clone(), nodes.clone());
}
}
});

tokio::spawn({
let nodes = nodes.clone();
let progress_log = progress_log.clone();
async move {
loop {
progress_log::copy(nodes.clone(), progress_log.clone());
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
});

Self {
nodes,
progress_log,
}
}

pub fn test() -> Self {
Self {
nodes: Arc::new(RwLock::new(Nodes::test())),
progress_log: Arc::new(RwLock::new(ProgressLog::test())),
}
}
}
Loading
Loading