Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ resolver = "3"
members = [
"rt",
"concurrency",
"macros",
"examples/bank",
"examples/bank_threads",
"examples/name_server",
Expand All @@ -15,11 +16,14 @@ members = [
"examples/busy_genserver_warning",
"examples/signal_test",
"examples/signal_test_threads",
"examples/chat_room",
"examples/service_discovery",
]

[workspace.dependencies]
spawned-rt = { path = "rt", version = "0.4.5" }
spawned-concurrency = { path = "concurrency", version = "0.4.5" }
spawned-macros = { path = "macros", version = "0.4.5" }
tracing = { version = "0.1.41", features = ["log"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }

Expand Down
1 change: 1 addition & 0 deletions concurrency/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ license.workspace = true

[dependencies]
spawned-rt = { workspace = true }
spawned-macros = { workspace = true }
tracing = { workspace = true }
futures = "0.3.1"
thiserror = "2.0.12"
Expand Down
20 changes: 6 additions & 14 deletions concurrency/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,20 @@
#[derive(Debug, thiserror::Error)]
pub enum ActorError {
#[error("Callback Error")]
Callback,
#[error("Initialization error")]
Initialization,
#[error("Server error")]
Server,
#[error("Unsupported Request on this Actor")]
RequestUnused,
#[error("Unsupported Message on this Actor")]
MessageUnused,
#[error("Actor stopped")]
ActorStopped,
#[error("Request to Actor timed out")]
RequestTimeout,
}

impl<T> From<spawned_rt::threads::mpsc::SendError<T>> for ActorError {
fn from(_value: spawned_rt::threads::mpsc::SendError<T>) -> Self {
Self::Server
Self::ActorStopped
}
}

impl<T> From<spawned_rt::tasks::mpsc::SendError<T>> for ActorError {
fn from(_value: spawned_rt::tasks::mpsc::SendError<T>) -> Self {
Self::Server
Self::ActorStopped
}
}

Expand All @@ -32,7 +24,7 @@ mod tests {

#[test]
fn test_error_into_std_error() {
let error: &dyn std::error::Error = &ActorError::Callback;
assert_eq!(error.to_string(), "Callback Error");
let error: &dyn std::error::Error = &ActorError::ActorStopped;
assert_eq!(error.to_string(), "Actor stopped");
}
}
5 changes: 2 additions & 3 deletions concurrency/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! spawned concurrency
//! Some basic traits and structs to implement concurrent code à-la-Erlang.
pub mod error;
pub mod messages;
pub mod message;
pub mod registry;
pub mod tasks;
pub mod threads;
136 changes: 136 additions & 0 deletions concurrency/src/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
pub trait Message: Send + 'static {
type Result: Send + 'static;
}

/// Declarative macro for defining message types.
///
/// Supports both unit structs and structs with fields, and they can be mixed
/// in a single invocation:
///
/// ```ignore
/// messages! {
/// GetCount -> u64;
/// Deposit { who: String, amount: i32 } -> Result<u64, BankError>;
/// Stop -> ()
/// }
/// ```
#[macro_export]
macro_rules! messages {
() => {};

// Base: unit message
($(#[$meta:meta])* $name:ident -> $result:ty) => {
$(#[$meta])*
pub struct $name;
impl $crate::message::Message for $name {
type Result = $result;
}
};

// Base: struct message
($(#[$meta:meta])* $name:ident { $($field:ident : $ftype:ty),* $(,)? } -> $result:ty) => {
$(#[$meta])*
pub struct $name { $(pub $field: $ftype,)* }
impl $crate::message::Message for $name {
type Result = $result;
}
};

// Recursive: unit message followed by more
($(#[$meta:meta])* $name:ident -> $result:ty; $($rest:tt)*) => {
$crate::messages!($(#[$meta])* $name -> $result);
$crate::messages!($($rest)*);
};

// Recursive: struct message followed by more
($(#[$meta:meta])* $name:ident { $($field:ident : $ftype:ty),* $(,)? } -> $result:ty; $($rest:tt)*) => {
$crate::messages!($(#[$meta])* $name { $($field : $ftype),* } -> $result);
$crate::messages!($($rest)*);
};
}

/// Fire-and-forget messages (Result type is always `()`).
///
/// ```ignore
/// send_messages! {
/// Increment;
/// Deposit { who: String, amount: i32 }
/// }
/// ```
#[macro_export]
macro_rules! send_messages {
() => {};

// Base: unit message
($(#[$meta:meta])* $name:ident) => {
$(#[$meta])*
pub struct $name;
impl $crate::message::Message for $name {
type Result = ();
}
};

// Base: struct message
($(#[$meta:meta])* $name:ident { $($field:ident : $ftype:ty),* $(,)? }) => {
$(#[$meta])*
pub struct $name { $(pub $field: $ftype,)* }
impl $crate::message::Message for $name {
type Result = ();
}
};

// Recursive: unit message followed by more
($(#[$meta:meta])* $name:ident; $($rest:tt)*) => {
$crate::send_messages!($(#[$meta])* $name);
$crate::send_messages!($($rest)*);
};

// Recursive: struct message followed by more
($(#[$meta:meta])* $name:ident { $($field:ident : $ftype:ty),* $(,)? }; $($rest:tt)*) => {
$crate::send_messages!($(#[$meta])* $name { $($field : $ftype),* });
$crate::send_messages!($($rest)*);
};
}

/// Request-response messages (Result type is explicitly specified).
///
/// ```ignore
/// request_messages! {
/// GetCount -> u64;
/// Lookup { key: String } -> Option<String>
/// }
/// ```
#[macro_export]
macro_rules! request_messages {
() => {};

// Base: unit message
($(#[$meta:meta])* $name:ident -> $result:ty) => {
$(#[$meta])*
pub struct $name;
impl $crate::message::Message for $name {
type Result = $result;
}
};

// Base: struct message
($(#[$meta:meta])* $name:ident { $($field:ident : $ftype:ty),* $(,)? } -> $result:ty) => {
$(#[$meta])*
pub struct $name { $(pub $field: $ftype,)* }
impl $crate::message::Message for $name {
type Result = $result;
}
};

// Recursive: unit message followed by more
($(#[$meta:meta])* $name:ident -> $result:ty; $($rest:tt)*) => {
$crate::request_messages!($(#[$meta])* $name -> $result);
$crate::request_messages!($($rest)*);
};

// Recursive: struct message followed by more
($(#[$meta:meta])* $name:ident { $($field:ident : $ftype:ty),* $(,)? } -> $result:ty; $($rest:tt)*) => {
$crate::request_messages!($(#[$meta])* $name { $($field : $ftype),* } -> $result);
$crate::request_messages!($($rest)*);
};
}
2 changes: 0 additions & 2 deletions concurrency/src/messages.rs

This file was deleted.

91 changes: 91 additions & 0 deletions concurrency/src/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use std::any::Any;
use std::collections::HashMap;
use std::sync::{OnceLock, RwLock};

type Store = RwLock<HashMap<String, Box<dyn Any + Send + Sync>>>;

fn global_store() -> &'static Store {
static STORE: OnceLock<Store> = OnceLock::new();
STORE.get_or_init(|| RwLock::new(HashMap::new()))
}

#[derive(Debug, thiserror::Error)]
pub enum RegistryError {
#[error("name '{0}' is already registered")]
AlreadyRegistered(String),
}

pub fn register<T: Send + Sync + 'static>(name: &str, value: T) -> Result<(), RegistryError> {
let mut store = global_store().write().unwrap_or_else(|p| p.into_inner());
if store.contains_key(name) {
return Err(RegistryError::AlreadyRegistered(name.to_string()));
}
store.insert(name.to_string(), Box::new(value));
Ok(())
}

pub fn whereis<T: Clone + Send + Sync + 'static>(name: &str) -> Option<T> {
let store = global_store().read().unwrap_or_else(|p| p.into_inner());
store.get(name)?.downcast_ref::<T>().cloned()
}

pub fn unregister(name: &str) {
let mut store = global_store().write().unwrap_or_else(|p| p.into_inner());
store.remove(name);
}

pub fn registered() -> Vec<String> {
let store = global_store().read().unwrap_or_else(|p| p.into_inner());
store.keys().cloned().collect()
}

#[cfg(test)]
mod tests {
use super::*;

// Use unique names per test to avoid cross-test interference with global state.

#[test]
fn register_and_whereis() {
register("test_rw_1", 42u64).unwrap();
let val: Option<u64> = whereis("test_rw_1");
assert_eq!(val, Some(42));
}

#[test]
fn whereis_wrong_type_returns_none() {
register("test_wt_1", 42u64).unwrap();
let val: Option<String> = whereis("test_wt_1");
assert_eq!(val, None);
}

#[test]
fn whereis_missing_returns_none() {
let val: Option<u64> = whereis("nonexistent_key");
assert_eq!(val, None);
}

#[test]
fn duplicate_register_fails() {
register("test_dup_1", 1u32).unwrap();
let result = register("test_dup_1", 2u32);
assert!(result.is_err());
}

#[test]
fn unregister_removes_entry() {
register("test_unreg_1", "hello".to_string()).unwrap();
unregister("test_unreg_1");
let val: Option<String> = whereis("test_unreg_1");
assert_eq!(val, None);
}

#[test]
fn registered_lists_names() {
register("test_list_a", 1u32).unwrap();
register("test_list_b", 2u32).unwrap();
let names = registered();
assert!(names.contains(&"test_list_a".to_string()));
assert!(names.contains(&"test_list_b".to_string()));
}
}
Loading