Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
[workspace]
members = [
"rtkbase",
"ntrip",
]
resolver = "2"

[package]
name = "maarco"
version = "0.1.0"
edition = "2024"
description = "Logging from Arduino and receiving NTRIP data via RTK2GO in Rust"

[dependencies]
base64 = "0.22.1"
chrono = "0.4.42"
clap = { version = "4.5.48", features = ["derive"] }
clap_derive = "4.5.47"
Expand All @@ -14,4 +21,6 @@ csv = "1.3.1"
eyre = "0.6.12"
nmea = { version = "0.7.0", features = ["default", "serde"] }
serde = "1.0.228"
serialport = { version = "4.7.3", default-features = false }
serialport = { version = "4.7.3", default-features = false }
ntrip = { path = "ntrip" }
rtkbase = { path = "rtkbase" }
2 changes: 0 additions & 2 deletions Cross.toml

This file was deleted.

8 changes: 8 additions & 0 deletions ntrip/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "ntrip"
version = "0.1.0"
edition = "2024"
description = "NTRIP Client for RTK2GO implementation in Rust"

[dependencies]
base64 = "0.22.1"
File renamed without changes.
8 changes: 8 additions & 0 deletions rtkbase/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "rtkbase"
version = "0.1.0"
edition = "2024"
description = "RTK Base Station code"

[dependencies]
serialport = { version = "4.7.3", default-features = false }
80 changes: 80 additions & 0 deletions rtkbase/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use crate::protocol::response::WireMessage;
use std::sync::mpsc::{Sender};
use std::sync::{Arc, Mutex};

type MatchFn = Box<dyn Fn(&WireMessage) -> bool + Send + Sync>;


struct Waiter {
/// Function to determine if a message matches the waiter's criteria.
matches: MatchFn,
/// Transmitter to send matched messages to the waiter.
tx: Sender<WireMessage>,
/// How many messages are still needed before the waiter is satisfied:
remaining: u32,
}

#[derive(Clone)]
pub struct Dispatcher {
/// The transmitter for the main data stream. Used to forward messages not claimed by waiters.
stream_tx: Option<Sender<WireMessage>>,
/// The list of waiters listening for specific messages.
waiters: Arc<Mutex<Vec<Waiter>>>,
}

impl Dispatcher {
pub fn new() -> Self {
Dispatcher {
stream_tx: None,
waiters: Arc::new(Mutex::new(Vec::new())),
}
}

pub fn set_stream_tx(&mut self, tx: Sender<WireMessage>) {
self.stream_tx = Some(tx);
}

/// Registers a new waiter with a matching function and a transmitter.
pub fn register_waiter(&self, matches: MatchFn, tx: Sender<WireMessage>, count: u32) {
let mut waiters = self.waiters.lock().unwrap();
waiters.push(Waiter { matches, tx, remaining: count });
}

pub fn dispatch(&self, msg: WireMessage) {
// Try to satisfy waiters first:
let mut waiters = self.waiters.lock().unwrap();
let mut i = 0;

// println!("Waiters count: {}", waiters.len());
while i < waiters.len() {
if (waiters[i].matches)(&msg) {
// Send the message to the waiter:
// println!("A waiter claimed the message {:?}, sending to waiter.", &msg);
let _ = waiters[i].tx.send(msg.clone());
// Decrement remaining count
waiters[i].remaining -= 1;

// Remove if done
if waiters[i].remaining == 0 {
waiters.remove(i);
}
drop(waiters); // Release lock
return;
} else {
i += 1;
}
}
drop(waiters); // Release lock
// No waiter claimed the message, send to stream:
// println!("No waiter claimed the message, sending to stream.");
self.fanout_stream(msg);
}

fn fanout_stream(&self, msg: WireMessage) {
if let Some(ref tx) = self.stream_tx {
// println!("Sending message to stream.");
let _ = tx.send(msg);
}
}
}

5 changes: 5 additions & 0 deletions rtkbase/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod parsing;
pub mod port;
pub mod protocol;
pub mod dispatcher;
mod methods;
38 changes: 38 additions & 0 deletions rtkbase/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/// This file is just for testing the rtkbase crate.
///
use rtkbase;
use rtkbase::port::BaseGPS;
use rtkbase::protocol::response::WireMessage;
use std::{path::PathBuf};

fn test_reading_sentences() {
let mut rtk = BaseGPS::open_port(PathBuf::from("/dev/ttyUSB0")).unwrap();
let _ = rtk.start();
println!("Opened RTK GPS port successfully.");
let timeout = std::time::Duration::from_secs(2);

let mut count = 0;
while count < 5 {
if let Some(msg) = rtk.get_gps_data(timeout) {
match &msg {
WireMessage::PQTMMessage(resp) => {
println!("Received PQTM Response: {:?}", resp);
}
WireMessage::PairMessage(pair) => {
println!("Received PAIR Message: {:?}", pair);
}
}
}
count += 1;
}

let ver_no = rtk.verno(timeout).unwrap();
println!("Module Version: {:?}", ver_no.version);

let rtcm_output_mode = rtk.pair_get_rtcm_mode(timeout).unwrap();
println!("Current RTCM Output Mode: {:?}", rtcm_output_mode);
}

fn main() {
test_reading_sentences();
}
Loading