Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ jobs:
- name: Test (vcan)
if: steps.vcan.outputs.available == 'true'
run: cargo llvm-cov --no-report nextest --all-features --run-ignored ignored-only
# Run clippy twice - once with the 1.89 MSRV, and once with the latest stable toolchain
- name: Clippy
run: cargo clippy --no-deps --all-targets --all-features
- name: Coverage report
run: |
cargo llvm-cov report --cobertura --output-path coverage.xml
Expand All @@ -106,9 +109,6 @@ jobs:
PERCENT="$(echo "($RATE * 100)/1" | bc)"
echo "PERCENT=$PERCENT"
echo "COVERAGE_PERCENT=$PERCENT" >> $GITHUB_ENV
# Run clippy twice - once with the 1.89 MSRV, and once with the latest stable toolchain
- name: Clippy
run: cargo clippy --no-deps --all-targets --all-features
- name: Update coverage badge
uses: schneegans/dynamic-badges-action@v1.7.0
if: github.ref_name == github.event.repository.default_branch
Expand All @@ -125,6 +125,20 @@ jobs:
valColorRange: ${{ env.COVERAGE_PERCENT }}
minColorRange: 40
maxColorRange: 65
- name: Setup nightly toolchain (ASAN)
uses: dtolnay/rust-toolchain@master
with:
toolchain: nightly
components: rust-src
- name: Test (ASAN)
env:
RUSTFLAGS: -D warnings -Zsanitizer=address
run: cargo +nightly nextest run -Zbuild-std --target x86_64-unknown-linux-gnu --all-features --no-tests=warn
- name: Test (ASAN, vcan)
if: steps.vcan.outputs.available == 'true'
env:
RUSTFLAGS: -D warnings -Zsanitizer=address
run: cargo +nightly nextest run -Zbuild-std --target x86_64-unknown-linux-gnu --all-features --run-ignored ignored-only

# Canary job: verifies vcan is available on the runner. Shows yellow when the
# linux-modules-extra package drifts from the runner kernel version, which means the socketcan
Expand Down
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ version = "0.1.0-rc0"
edition = "2024"
license = "MIT"
rust-version = "1.89"
description = "Opinionated CAN utils written in Rust"
description = "Opinionated CAN utilities written in Rust"

[workspace.dependencies]
assert_cmd = { version = "2.2.0", features = ["color-auto"] }
ctor = "0.6"
eyre = "0.6"
gungraun = "0.17"
io-uring = "0.7"
libc = "0.2"
neli = "0.7"
tabled = "0.18"
tempfile = "3.27.0"
tracing = "0.1"
tracing-subscriber = "0.3"
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
![release workflow](https://github.com/Notgnoshi/candemonium/actions/workflows/release.yml/badge.svg?event=push)
![code coverage](https://img.shields.io/endpoint?url=https://gist.githubusercontent.com/Notgnoshi/55f3f6cae2abdc5d011d907624dfb883/raw/can-utils-rs-coverage.json)

Opinionated CAN utils written in Rust.
Opinionated CAN utilties written in Rust.

## Purpose

Expand All @@ -16,3 +16,9 @@ constraints.

A modern-ish Linux with io_uring and socketcan available. A ~4 core ~1GHz arm64 CPU with 1GB memory
and 4+ J1939 CAN networks.

## Documentation

* See [quickstart.md](/docs/developer/quickstart.md) for a developer quickstart
* See `docs/design/` for design documents
* See `docs/user/` for user documentation
2 changes: 1 addition & 1 deletion candumpr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ ci = []
eyre.workspace = true
io-uring.workspace = true
libc.workspace = true
tracing.workspace = true

[dev-dependencies]
ctor.workspace = true
gungraun.workspace = true
tabled.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
vcan-fixture = { path = "../vcan-fixture" }

Expand Down
67 changes: 57 additions & 10 deletions candumpr/benches/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, Instant};

use candumpr::can::{self, CanFrame};
Expand Down Expand Up @@ -51,19 +51,53 @@ pub const BACKENDS: &[BackendDef] = &[
},
];

// --- Sequence checker ---

fn frame_seq(frame: &CanFrame) -> u32 {
u32::from_le_bytes([frame.data[0], frame.data[1], frame.data[2], frame.data[3]])
}

struct SeqCheck {
expected: Vec<AtomicU32>,
}

impl SeqCheck {
fn new(n: usize) -> Self {
Self {
expected: (0..n).map(|_| AtomicU32::new(0)).collect(),
}
}

fn check(&self, idx: usize, frame: &CanFrame) {
let actual = frame_seq(frame);
let expected = self.expected[idx].load(Ordering::Relaxed);
if actual != expected {
tracing::warn!(
iface = idx,
received = actual,
expected = expected,
"out-of-sequence frame"
);
}
self.expected[idx].store(actual.wrapping_add(1), Ordering::Relaxed);
}
}

// --- Backend run functions ---
//
// Single-threaded backends: wrap the backend's run() with getrusage_thread() before/after.
// Dedicated backend: uses run_instrumented() to collect per-thread RUSAGE_THREAD and
// aggregate the deltas.

fn run_dedicated(sockets: Vec<OwnedFd>, stop: Arc<AtomicBool>, count: &AtomicU64) -> (u64, Rusage) {
let seq = SeqCheck::new(sockets.len());
let backend = DedicatedRecv::new(sockets);
let rusage = std::sync::Mutex::new(Rusage::default());
let total = backend
.run_instrumented(
stop,
&|_idx, _frame, _meta| {
&|idx, frame, _meta| {
seq.check(idx, frame);
count.fetch_add(1, Ordering::Relaxed);
},
&|_idx, inner| {
Expand All @@ -78,10 +112,12 @@ fn run_dedicated(sockets: Vec<OwnedFd>, stop: Arc<AtomicBool>, count: &AtomicU64
}

fn run_epoll(sockets: Vec<OwnedFd>, stop: Arc<AtomicBool>, count: &AtomicU64) -> (u64, Rusage) {
let seq = SeqCheck::new(sockets.len());
let mut backend = EpollRecv::new(sockets).unwrap();
let before = getrusage_thread();
let total = backend
.run(stop, &mut |_idx, _frame, _meta| {
.run(stop, &mut |idx, frame, _meta| {
seq.check(idx, frame);
count.fetch_add(1, Ordering::Relaxed);
})
.unwrap();
Expand All @@ -90,10 +126,12 @@ fn run_epoll(sockets: Vec<OwnedFd>, stop: Arc<AtomicBool>, count: &AtomicU64) ->
}

fn run_recvmmsg(sockets: Vec<OwnedFd>, stop: Arc<AtomicBool>, count: &AtomicU64) -> (u64, Rusage) {
let seq = SeqCheck::new(sockets.len());
let mut backend = RecvmmsgRecv::new(sockets).unwrap();
let before = getrusage_thread();
let total = backend
.run(stop, &mut |_idx, _frame, _meta| {
.run(stop, &mut |idx, frame, _meta| {
seq.check(idx, frame);
count.fetch_add(1, Ordering::Relaxed);
})
.unwrap();
Expand All @@ -102,10 +140,12 @@ fn run_recvmmsg(sockets: Vec<OwnedFd>, stop: Arc<AtomicBool>, count: &AtomicU64)
}

fn run_uring(sockets: Vec<OwnedFd>, stop: Arc<AtomicBool>, count: &AtomicU64) -> (u64, Rusage) {
let seq = SeqCheck::new(sockets.len());
let mut backend = UringRecv::new(sockets).unwrap();
let before = getrusage_thread();
let total = backend
.run(stop, &mut |_idx, _frame, _meta| {
.run(stop, &mut |idx, frame, _meta| {
seq.check(idx, frame);
count.fetch_add(1, Ordering::Relaxed);
})
.unwrap();
Expand All @@ -118,10 +158,12 @@ fn run_uring_multi(
stop: Arc<AtomicBool>,
count: &AtomicU64,
) -> (u64, Rusage) {
let seq = SeqCheck::new(sockets.len());
let mut backend = UringMultiRecv::new(sockets).unwrap();
let before = getrusage_thread();
let total = backend
.run(stop, &mut |_idx, _frame, _meta| {
.run(stop, &mut |idx, frame, _meta| {
seq.check(idx, frame);
count.fetch_add(1, Ordering::Relaxed);
})
.unwrap();
Expand Down Expand Up @@ -155,18 +197,23 @@ fn sender_loop(
}
frame_idx += 1;
}
// Let the receiver drain in-flight frames before signaling stop. Several receivers use 100ms
// as a timeout to wake themselves up. This isn't a great design, but it's possible to drop
// frames, so I can't just say "run until all frames have been received".
std::thread::sleep(Duration::from_millis(110));
stop.store(true, Ordering::Relaxed);
}

fn make_frame(iface_idx: usize, frame_idx: u32) -> CanFrame {
let seq = frame_idx.to_le_bytes();
CanFrame::new(
((iface_idx as u32) << 8) | (frame_idx & 0xFF) | libc::CAN_EFF_FLAG,
&[
seq[0],
seq[1],
seq[2],
seq[3],
iface_idx as u8,
frame_idx as u8,
0xDE,
0xAD,
0xBE,
0xEF,
0xCA,
0xFE,
Expand Down
72 changes: 72 additions & 0 deletions candumpr/examples/dump.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//! Listen on CAN interfaces using the io_uring multishot backend and print received frames.
//!
//! Usage: uring_multi_dump <iface> [iface...]

use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use candumpr::can::{self, CanFrame};
use candumpr::recv::uring_multi::UringMultiRecv;

fn main() -> std::io::Result<()> {
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.with_max_level(tracing::Level::DEBUG)
.init();

let ifaces: Vec<String> = std::env::args().skip(1).collect();
if ifaces.is_empty() {
eprintln!("usage: uring_multi_dump <iface> [iface...]");
std::process::exit(1);
}

let sockets: Vec<_> = ifaces
.iter()
.map(|name| can::open_can_raw(name))
.collect::<std::io::Result<_>>()?;

let mut backend = UringMultiRecv::new(sockets)?;

let stop = Arc::new(AtomicBool::new(false));
let stop2 = stop.clone();
ctrlc(stop2);

let total = backend.run(stop, &mut |idx, frame, _meta| {
print_frame(idx, frame);
})?;

eprintln!("{total} frames received");
Ok(())
}

fn print_frame(idx: usize, frame: &CanFrame) {
let id = frame.can_id & !libc::CAN_EFF_FLAG & !libc::CAN_RTR_FLAG & !libc::CAN_ERR_FLAG;

print!("{idx} {id:08X} [{}]", frame.len);
for i in 0..frame.len as usize {
print!(" {:02X}", frame.data[i]);
}
println!();
}

/// Install a Ctrl-C handler that sets the stop flag.
fn ctrlc(stop: Arc<AtomicBool>) {
unsafe {
libc::signal(
libc::SIGINT,
signal_handler as *const () as libc::sighandler_t,
);
}
// Leak the Arc into a raw pointer so the signal handler can access it.
STOP_FLAG.store(Arc::into_raw(stop) as *mut _, Ordering::Release);
}

static STOP_FLAG: std::sync::atomic::AtomicPtr<AtomicBool> =
std::sync::atomic::AtomicPtr::new(std::ptr::null_mut());

extern "C" fn signal_handler(_sig: libc::c_int) {
let ptr = STOP_FLAG.load(Ordering::Acquire);
if !ptr.is_null() {
unsafe { &*ptr }.store(true, Ordering::Relaxed);
}
}
Loading