diff --git a/Cargo.lock b/Cargo.lock index 86d9803..d1be89d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anyhow" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" + [[package]] name = "autocfg" version = "1.5.0" @@ -43,6 +49,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "buqueue-memory" +version = "0.1.0" +dependencies = [ + "buqueue-core", + "bytes", + "chrono", + "futures", + "tokio", + "tokio-test", + "uuid", +] + [[package]] name = "bytes" version = "1.11.1" @@ -84,6 +103,12 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + [[package]] name = "errno" version = "0.3.14" @@ -100,6 +125,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "futures" version = "0.3.32" @@ -188,6 +219,40 @@ dependencies = [ "slab", ] +[[package]] +name = "getrandom" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasip2", + "wasip3", +] + +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "foldhash", +] + +[[package]] +name = "hashbrown" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "iana-time-zone" version = "0.1.65" @@ -212,6 +277,24 @@ dependencies = [ "cc", ] +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + +[[package]] +name = "indexmap" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" +dependencies = [ + "equivalent", + "hashbrown 0.17.0", + "serde", + "serde_core", +] + [[package]] name = "itoa" version = "1.0.18" @@ -228,6 +311,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "libc" version = "0.2.183" @@ -310,6 +399,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.106" @@ -328,6 +427,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + [[package]] name = "redox_syscall" version = "0.5.18" @@ -349,6 +454,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "semver" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd" + [[package]] name = "serde" version = "1.0.228" @@ -489,6 +600,28 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-test" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6d24790a10a7af737693a3e8f1d03faef7e6ca0cc99aae5066f533766de545" +dependencies = [ + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tracing" version = "0.1.44" @@ -526,12 +659,47 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + +[[package]] +name = "uuid" +version = "1.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ac8b6f42ead25368cf5b098aeb3dc8a1a2c05a3eee8a9a1a68c640edbfc79d9" +dependencies = [ + "getrandom", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" +[[package]] +name = "wasip2" +version = "1.0.2+wasi-0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5" +dependencies = [ + "wit-bindgen", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "wasm-bindgen" version = "0.2.114" @@ -577,6 +745,40 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap", + "wasm-encoder", + "wasmparser", +] + +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags", + "hashbrown 0.15.5", + "indexmap", + "semver", +] + [[package]] name = "windows-core" version = "0.62.2" @@ -645,6 +847,94 @@ dependencies = [ "windows-link", ] +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + [[package]] name = "zmij" version = "1.0.21" diff --git a/Cargo.toml b/Cargo.toml index d4344e6..a3f85cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "crates/buqueue-core", + "crates/buqueue-memory", ] resolver = "3" @@ -9,7 +10,7 @@ resolver = "3" edition="2024" [workspace.dependencies] -buqueue-core = { path = "buqueue-core" } +buqueue-core = { path = "crates/buqueue-core" } thiserror = { version = "2.0.18" } bytes = "1.11.1" serde = { version = "1.0.228", features = ["derive"] } @@ -18,6 +19,8 @@ chrono = "0.4.44" tokio = { version = "1.50.0", features = ["full"] } tracing = "0.1.44" futures = "0.3.32" +uuid = { version = "1.23.0", features = ["v4"] } +tokio-test = "0.4.5" [workspace.lints.rust] missing_docs = "warn" diff --git a/crates/buqueue-core/src/traits/consumer.rs b/crates/buqueue-core/src/traits/consumer.rs index b12f1f9..b1ddd3f 100644 --- a/crates/buqueue-core/src/traits/consumer.rs +++ b/crates/buqueue-core/src/traits/consumer.rs @@ -23,11 +23,12 @@ //! The `ref_delegate!` macro therefore can only covers `Box` and `&mut T`, //! not `Arc`. -use std::pin::Pin; - -use futures::{Stream, stream}; - use crate::prelude::{BuqueueResult, Delivery, ShutdownHandle}; +use futures::{ + Stream, StreamExt, + stream::{self, BoxStream}, +}; +use std::pin::Pin; // -------- QueueConsumer ------------------------------ @@ -136,6 +137,26 @@ pub trait QueueConsumer: Send { } } + /// Returns an async stream of messages from this consumer + /// This method borrows `&mut self`, allowing the consumer to be used again + /// after the stream is dropped + fn messages( + &mut self, + ) -> impl Future> + Send + '_>> + + Send { + async move { + let stream = stream::try_unfold(self, |consumer| async { + match consumer.receive_graceful().await { + Some(Ok(delivery)) => Ok(Some((delivery, consumer))), + Some(Err(e)) => Err(e), + None => Ok(None), + } + }); + + Ok(stream.boxed()) + } + } + /// Convert this consumer into an async `Stream` of deliverie /// /// Consumes `self`. Implemented with `futures::stream::unfold`, no unsafe @@ -153,12 +174,12 @@ pub trait QueueConsumer: Send { where Self: Sized + 'static, { - stream::unfold(self, |mut consumer| async move { - if consumer.shutdown_handle().is_shutdown() { - return None; + stream::try_unfold(self, |mut consumer| async move { + match consumer.receive_graceful().await { + Some(Ok(delivery)) => Ok(Some((delivery, consumer))), + Some(Err(e)) => Err(e), + None => Ok(None), } - let item = consumer.receive().await; - Some((item, consumer)) }) } @@ -216,6 +237,14 @@ macro_rules! ref_delefate { ) -> impl Future>> + Send { (**self).receive_graceful() } + + fn messages( + &mut self, + ) -> impl Future< + Output = BuqueueResult> + Send + '_>, + > + Send { + (**self).messages() + } } }; } @@ -228,6 +257,8 @@ ref_delefate!(T, Box); // The dyn-compatible bridge. Same reasoning as ErasedQueueProducer. // into_stream and into_dyn are excluded, both require Self: Sized +type MessageStream<'a> = BoxStream<'a, BuqueueResult>; + pub(crate) trait ErasedQueueConsumer: Send { fn receive<'a>( &'a mut self, @@ -247,6 +278,10 @@ pub(crate) trait ErasedQueueConsumer: Send { fn receive_graceful<'a>( &'a mut self, ) -> Pin>> + Send + 'a>>; + + fn messages<'a>( + &'a mut self, + ) -> Pin>> + Send + 'a>>; } // --- DynConsumerInner ------------------ @@ -284,6 +319,15 @@ impl ErasedQueueConsumer for DynConsumerInner { ) -> Pin>> + Send + 'a>> { Box::pin(self.inner.receive_graceful()) } + + fn messages<'a>( + &'a mut self, + ) -> Pin>> + Send + 'a>> { + Box::pin(async move { + let stream = self.inner.messages().await?; + Ok(Box::pin(stream) as BoxStream<'a, BuqueueResult>) + }) + } } // ---- BaseDynConsumer / DynConsumer ---------------------- @@ -344,6 +388,15 @@ impl<'a> BaseDynConsumer<'a> { pub async fn receive_graceful(&mut self) -> Option> { self.0.receive_graceful().await } + + /// Returns an async stream of messages from this consumer + /// + /// # Errors + /// + /// Return Error emitted by backend + pub async fn messages(&mut self) -> BuqueueResult>> { + self.0.messages().await + } } impl std::fmt::Debug for BaseDynConsumer<'_> { diff --git a/crates/buqueue-memory/Cargo.toml b/crates/buqueue-memory/Cargo.toml new file mode 100644 index 0000000..31fd71a --- /dev/null +++ b/crates/buqueue-memory/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "buqueue-memory" +version = "0.1.0" +edition.workspace = true +description = "In-Memory backend for the buqueue" +license = "MIT" + +[dependencies] +buqueue-core = { workspace = true } +tokio = { workspace = true } +bytes = { workspace = true } +chrono = { workspace = true } +uuid = { workspace = true } +tokio-test = { workspace = true } +futures = { workspace = true } + +[lints] +workspace = true diff --git a/crates/buqueue-memory/src/ack.rs b/crates/buqueue-memory/src/ack.rs new file mode 100644 index 0000000..26d25d3 --- /dev/null +++ b/crates/buqueue-memory/src/ack.rs @@ -0,0 +1,58 @@ +//! Ack/nack handle for in-memory deliveries + +use std::{pin::Pin, sync::Arc}; + +use crate::envelope::{Envelope, SharedState}; +use buqueue_core::{delivery::AckHandle, error::BuqueueResult}; +use tokio::sync::{Mutex, mpsc}; + +/// The ack handle attached to every `MemoryConsumer` delivery +/// +/// - `ack()`: drops the message (not requeued) +/// - `nack()`: requeues with an incremented delivery count, or routes +/// to the DLQ once `max_receive_count` is reached +#[derive(Debug)] +pub(crate) struct MemoryAckHandle { + pub(crate) envelope: Envelope, + pub(crate) requeue_tx: mpsc::Sender, + pub(crate) shared: Arc>, +} + +impl AckHandle for MemoryAckHandle { + fn ack(&self) -> Pin> + Send + '_>> { + // Simply drop, not requeued + Box::pin(async move { Ok(()) }) + } + + fn nack(&self) -> Pin> + Send + '_>> { + Box::pin(async move { + let mut state = self.shared.lock().await; + + let max = state + .dlq_config + .as_ref() + .map_or(u32::MAX, |c| c.max_receive_count); + + let count = state + .nack_counts + .entry(self.envelope.id.clone()) + .or_insert(0); + *count += 1; + + if *count >= max { + // Route the DLQ if configured, otherwise silently drop + if let Some(dlq_tx) = &state.dlq_tx { + let mut dlq_envelope = self.envelope.clone(); + dlq_envelope.delivery_count += 1; + let _ = dlq_tx.send(dlq_envelope).await; + } + } else { + // Requeue with incremented delivery count + let mut requeued = self.envelope.clone(); + requeued.delivery_count += 1; + let _ = self.requeue_tx.send(requeued).await; + } + Ok(()) + }) + } +} diff --git a/crates/buqueue-memory/src/builder.rs b/crates/buqueue-memory/src/builder.rs new file mode 100644 index 0000000..e5f5462 --- /dev/null +++ b/crates/buqueue-memory/src/builder.rs @@ -0,0 +1,133 @@ +//! Builder and backend entry point for the in-memory backend +//! +//! `MemoryBackend` exposes `builder()` as a plain inherent method + +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, +}; + +use buqueue_core::{ + backend::BackendBuilder, dlq::DlqConfig, error::BuqueueResult, prelude::ShutdownHandle, +}; +use tokio::sync::{Mutex, mpsc}; + +use crate::{ + MemoryConfig, consumer::MemoryConsumer, envelope::SharedState, producer::MemoryProducer, +}; + +// ------ MemoryBuilder ------------------ + +/// Fulent builder for the in-memory backend +/// +/// Obtain one via `MemoryBackend::builder` +pub struct MemoryBuilder { + pub(crate) config: MemoryConfig, + pub(crate) dlq_config: Option, +} + +impl BackendBuilder for MemoryBuilder { + type Producer = MemoryProducer; + type Consumer = MemoryConsumer; + + fn dead_letter_queue(mut self, config: DlqConfig) -> Self { + self.dlq_config = Some(config); + self + } + + async fn build_pair(self) -> BuqueueResult<(MemoryProducer, MemoryConsumer)> { + let capacity = self.config.capacity.unwrap_or(1024); + let (tx, rx) = mpsc::channel(capacity); + + let (dlq_tx, dlq_config) = if let Some(cfg) = self.dlq_config { + let (dlq_tx, _dlq_rx) = mpsc::channel(capacity); + (Some(dlq_tx), Some(cfg)) + } else { + (None, None) + }; + + let shared = Arc::new(Mutex::new(SharedState { + dlq_config, + dlq_tx, + nack_counts: HashMap::new(), + scheduled: BTreeMap::new(), + })); + + let (liveness_tx, liveness_rx) = mpsc::channel(1); + + let shutdown = ShutdownHandle::new(); + let producer = MemoryProducer { + tx: tx.clone(), + shared: Arc::clone(&shared), + _liveness: liveness_tx, + }; + let consumer = MemoryConsumer { + rx, + tx, + shared, + shutdown, + liveness_rx, + }; + + Ok((producer, consumer)) + } + + async fn build_producer(self) -> BuqueueResult { + let (p, _) = self.build_pair().await?; + Ok(p) + } + + async fn build_consumer(self) -> BuqueueResult { + let (_, c) = self.build_pair().await?; + Ok(c) + } +} + +// ------ MemoryBackend ----------------- + +/// The in-memory backend +/// +/// No broker, no Docker, no network, backed by `tokio::sync::mpsc` channel. +/// Use this in all your unit and integration tests. +/// +/// ## Usage +/// +/// ```rust +/// # tokio_test::block_on(async { +/// use buqueue_memory::{MemoryBackend, MemoryConfig}; +/// use buqueue_core::prelude::*; +/// +/// let (producer, mut consumer) = MemoryBackend::builder(MemoryConfig::default()) +/// .build_pair() +/// .await +/// .unwrap(); +/// +/// producer.send(Message::from_json(&42u32).unwrap()).await.unwrap(); +/// let delivery = consumer.receive().await.unwrap(); +/// assert_eq!(delivery.payload_json::().unwrap(), 42); +/// delivery.ack().await.unwrap(); +/// # }); +/// ``` +pub struct MemoryBackend; + +impl MemoryBackend { + /// Create a builder for the in-memory backend + /// + /// This is a plain inherent method, there is no `QueueBackend` trait + /// The call site looks the same as every other backend + /// + /// ```rust,ignore + /// MemoryBackend::builder(MemoryConfig::default()) + /// .dead_letter_queue(dlq) // optional + /// .make_dynamic() // optional + /// .build_pair() + /// .await? + /// ``` + #[must_use] + pub fn builder(config: MemoryConfig) -> MemoryBuilder { + MemoryBuilder { + config, + dlq_config: None, + } + } +} diff --git a/crates/buqueue-memory/src/config.rs b/crates/buqueue-memory/src/config.rs new file mode 100644 index 0000000..294e6ea --- /dev/null +++ b/crates/buqueue-memory/src/config.rs @@ -0,0 +1,14 @@ +//! Configuration for the in-memory backend + +/// Configuration for the in-memory backend +/// +/// There is intentinally very little to configure, this backend exists +/// for testing, not production. +#[derive(Debug, Clone, Default)] +pub struct MemoryConfig { + /// Internal channel buffer capacity + /// + /// Default to `1024`. Increase if your tests send may messages + /// before consuming any of them + pub capacity: Option, +} diff --git a/crates/buqueue-memory/src/consumer.rs b/crates/buqueue-memory/src/consumer.rs new file mode 100644 index 0000000..c62b0c1 --- /dev/null +++ b/crates/buqueue-memory/src/consumer.rs @@ -0,0 +1,138 @@ +//! In-memory consumer implementation + +use std::sync::Arc; + +use buqueue_core::{ + error::{BuqueueError, BuqueueResult, ErrorKind}, + prelude::{Delivery, QueueConsumer, ShutdownHandle}, +}; +use chrono::Utc; +use tokio::sync::{Mutex, mpsc}; + +use crate::{ + ack::MemoryAckHandle, + envelope::{Envelope, SharedState}, +}; + +/// In-memory consumer +/// +/// Receives messages from the channel written to by `MemoryProducer` +/// Not `Clone`, only one consumer per channel +pub struct MemoryConsumer { + pub(crate) rx: mpsc::Receiver, + pub(crate) tx: mpsc::Sender, + pub(crate) shared: Arc>, + pub(crate) shutdown: ShutdownHandle, + pub(crate) liveness_rx: mpsc::Receiver<()>, +} + +impl std::fmt::Debug for MemoryConsumer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MemoryConsumer").finish_non_exhaustive() + } +} + +impl MemoryConsumer { + pub(crate) fn make_delivery(&self, envelope: Envelope) -> Delivery { + let handle = Arc::new(MemoryAckHandle { + envelope: envelope.clone(), + requeue_tx: self.tx.clone(), + shared: Arc::clone(&self.shared), + }); + Delivery::new( + envelope.payload, + envelope.headers, + envelope.routing_key, + envelope.delivery_count, + Some(envelope.first_delivered), + handle, + ) + } + + /// Flush any schedlued messages that are now due into the main channel + /// + /// Must be called before receive/`try_receive` so that scheduled + /// messages become visible at the correct time + async fn flush_scheduled(&mut self) { + let mut state = self.shared.lock().await; + state.flush_due_scheduled(&self.tx).await; + } +} + +impl QueueConsumer for MemoryConsumer { + async fn receive(&mut self) -> BuqueueResult { + loop { + if self.shutdown.is_shutdown() { + return Err(BuqueueError::new(ErrorKind::ConsumerShutdown)); + } + + // deal with now-due scheduled message into the channel before polling + self.flush_scheduled().await; + + if let Ok(envelope) = self.rx.try_recv() { + return Ok(self.make_delivery(envelope)); + } + + let next_due = self.shared.lock().await.next_scheduled_at(); + + let delay = next_due.map(|at| { + (at - Utc::now()) + .to_std() + .unwrap_or(std::time::Duration::ZERO) + }); + + tokio::select! { + biased; + envelope = self.rx.recv() => { + let envelope = envelope.ok_or_else(|| BuqueueError::new(ErrorKind::ConnectionLost))?; + return Ok(self.make_delivery(envelope)); + } + // The timer branch + () = async { + match delay { + Some(d) => tokio::time::sleep(d).await, + None => std::future::pending().await, + } + } => { + // We do nothing here. + // The loop will naturally restart and call flush_scheduled() at the top. + } + _ = self.liveness_rx.recv() => { + return Err(BuqueueError::new(ErrorKind::ConnectionLost)); + } + } + } + } + + async fn try_receive(&mut self) -> BuqueueResult> { + // FLlush scheduled messages first, a scheduled message whose time + // has arrived must be visible to try_receive, not silently skipped + self.flush_scheduled().await; + match self.rx.try_recv() { + Ok(envelope) => Ok(Some(self.make_delivery(envelope))), + Err(mpsc::error::TryRecvError::Empty) => Ok(None), + Err(mpsc::error::TryRecvError::Disconnected) => { + Err(BuqueueError::new(ErrorKind::ConnectionLost)) + } + } + } + + fn shutdown_handle(&self) -> ShutdownHandle { + self.shutdown.clone() + } + + // Overrides the default, races receive() agains shutdown signal directly + // using `biased` so shutdown always takes priority + async fn receive_graceful(&mut self) -> Option> { + if self.shutdown.is_shutdown() { + return None; + } + + let shutdown = self.shutdown.clone(); + tokio::select! { + biased; + () = shutdown.wait_for_shutdown() => None, + delivery = self.receive() => Some(delivery) + } + } +} diff --git a/crates/buqueue-memory/src/envelope.rs b/crates/buqueue-memory/src/envelope.rs new file mode 100644 index 0000000..f428bd1 --- /dev/null +++ b/crates/buqueue-memory/src/envelope.rs @@ -0,0 +1,66 @@ +//! Internal state management for the memory backend + +use std::collections::{BTreeMap, HashMap}; + +use buqueue_core::dlq::DlqConfig; +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use tokio::sync::mpsc; + +/// A message as it travles thourgh the in-memory channel +/// +/// Carries the original payload and metadata plus delivery tracking state +#[derive(Debug, Clone)] +pub(crate) struct Envelope { + /// Unique ID assigned on send, used to track nack counts per message + pub(crate) id: String, + #[allow(dead_code)] + pub(crate) payload: Bytes, + #[allow(dead_code)] + pub(crate) headers: HashMap, + #[allow(dead_code)] + pub(crate) routing_key: Option, + /// Starts as 1, increment on each nack'd cycle + pub(crate) delivery_count: u32, + #[allow(dead_code)] + pub(crate) first_delivered: DateTime, +} + +/// Mutable state shared between the producer and the ack handle +/// +/// Wrapped inĀ `Arc>` so both side can access it safely across tasks +#[derive(Debug)] +pub(crate) struct SharedState { + pub(crate) dlq_config: Option, + pub(crate) dlq_tx: Option>, + /// How many times each message ID has been nack'd + pub(crate) nack_counts: HashMap, + /// Schedules messages not yet due for delivery + /// + /// Using `Vec` per key handles the case of + /// two messages scheduled for the exact same millisecond + pub(crate) scheduled: BTreeMap, Vec>, +} + +impl SharedState { + /// Returns the timestamp of the earliest scheduled message, if any + pub(crate) fn next_scheduled_at(&self) -> Option> { + self.scheduled.keys().next().copied() + } + + /// Drain all scheduled envelopes whose `deliver_at <= now` and send them + /// into `tx`. Called by the consumer before each receive/`try_receive` + pub(crate) async fn flush_due_scheduled(&mut self, tx: &mpsc::Sender) { + let now = Utc::now(); + + let remaining = self + .scheduled + .split_off(&(now + chrono::Duration::nanoseconds(1))); + let due = std::mem::replace(&mut self.scheduled, remaining); + for (_delivery_at, envelopes) in due { + for envelope in envelopes { + let _ = tx.send(envelope).await; + } + } + } +} diff --git a/crates/buqueue-memory/src/lib.rs b/crates/buqueue-memory/src/lib.rs new file mode 100644 index 0000000..47557f7 --- /dev/null +++ b/crates/buqueue-memory/src/lib.rs @@ -0,0 +1,49 @@ +//! # buqueue-memory +//! +//! An in-memory implentation if the `buqueue` traits +//! +//! This backend is ideal for: +//! - Unit and integration testing withouth external dependecies +//! - Local development of event-driven logic +//! - High-speed, transient messaging withing a single process +//! +//! It supports all core features: AFIT (Async Functions in Traits), +//! Graceful shutdown, Scheduled Delivery (`send_at`), and Dead Letter Queues (DLQ) +//! +//! ## Quick example +//! +//! ```rust +//! # tokio_test::block_on(async { +//! use buqueue_memory::{MemoryBackend, MemoryConfig}; +//! use buqueue_core::prelude::*; +//! +//! let (producer, mut consumer) = MemoryBackend::builder(MemoryConfig::default()) +//! .build_pair() +//! .await +//! .unwrap(); +//! +//! producer.send(Message::from_json(&42u32).unwrap()).await.unwrap(); +//! let delivery = consumer.receive().await.unwrap(); +//! assert_eq!(delivery.payload_json::().unwrap(), 42); +//! delivery.ack().await.unwrap(); +//! # }); +//! ``` + +#![warn(missing_docs)] +#![forbid(unsafe_code)] + +pub mod builder; +pub mod config; +pub mod consumer; +pub mod producer; + +mod ack; +mod envelope; + +#[cfg(test)] +mod tests; + +pub use builder::{MemoryBackend, MemoryBuilder}; +pub use config::MemoryConfig; +pub use consumer::MemoryConsumer; +pub use producer::MemoryProducer; diff --git a/crates/buqueue-memory/src/producer.rs b/crates/buqueue-memory/src/producer.rs new file mode 100644 index 0000000..aa104b0 --- /dev/null +++ b/crates/buqueue-memory/src/producer.rs @@ -0,0 +1,76 @@ +//! In-memory producer implementation + +use std::sync::Arc; + +use buqueue_core::{ + error::{BuqueueError, BuqueueResult, ErrorKind}, + prelude::{Message, MessageId, QueueProducer}, +}; +use chrono::Utc; +use tokio::sync::{Mutex, mpsc}; +use uuid::Uuid; + +use crate::envelope::{Envelope, SharedState}; + +/// In-memory producer +/// +/// Cheaply cloneable, share across tasks freely with `.clone()` or `Arc` +/// Both the clone and the original write to the same underlying channel +#[derive(Debug, Clone)] +pub struct MemoryProducer { + pub(crate) tx: mpsc::Sender, + #[allow(dead_code)] + pub(crate) shared: Arc>, + pub(crate) _liveness: mpsc::Sender<()>, +} + +impl QueueProducer for MemoryProducer { + async fn send(&self, message: Message) -> BuqueueResult { + let id = Uuid::new_v4().to_string(); + let envelope = Envelope { + id: id.clone(), + payload: message.payload().clone(), + headers: message.headers().clone(), + routing_key: message.routing_key().map(str::to_string), + delivery_count: 1, + first_delivered: Utc::now(), + }; + + self.tx + .send(envelope) + .await + .map_err(|_| BuqueueError::new(ErrorKind::ConnectionLost))?; + + Ok(MessageId::from(id)) + } + + async fn send_at( + &self, + message: Message, + delivery_at: chrono::DateTime, + ) -> BuqueueResult { + let id = Uuid::new_v4().to_string(); + let envelope = Envelope { + id: id.clone(), + payload: message.payload().clone(), + headers: message.headers().clone(), + routing_key: message.routing_key().map(str::to_string), + delivery_count: 1, + first_delivered: Utc::now(), + }; + + // Put inyo the scheduled map, Not the main channel + // The consumer drains due messages before each receive/try_receive, + // so the message will not be visible until deliver_at has passed. + { + let mut state = self.shared.lock().await; + state + .scheduled + .entry(delivery_at) + .or_insert_with(Vec::new) + .push(envelope); + } + + Ok(MessageId::from(id)) + } +} diff --git a/crates/buqueue-memory/src/tests.rs b/crates/buqueue-memory/src/tests.rs new file mode 100644 index 0000000..eaea6c3 --- /dev/null +++ b/crates/buqueue-memory/src/tests.rs @@ -0,0 +1,800 @@ +//! Unit tests for the in-memory backend +//! +//! Every public API behaviour is covered here. These tests are also the +//! canonical examples of how to use buqueue-memory + +use std::vec; + +use buqueue_core::{ + backend::BackendBuilder, + dlq::DlqConfig, + prelude::{Message, QueueConsumer, QueueProducer}, +}; +use bytes::Bytes; +use chrono::Utc; + +use crate::{MemoryBackend, MemoryConfig, MemoryConsumer, MemoryProducer}; + +async fn make_pair() -> (MemoryProducer, MemoryConsumer) { + MemoryBackend::builder(MemoryConfig::default()) + .build_pair() + .await + .unwrap() +} + +#[tokio::test] +async fn send_and_receive() { + let (producer, mut consumer) = make_pair().await; + producer + .send(Message::from_json(&42u32).unwrap()) + .await + .unwrap(); + let delivery = consumer.receive().await.unwrap(); + assert_eq!(delivery.payload_json::().unwrap(), 42); + delivery.ack().await.unwrap(); +} + +#[tokio::test] +async fn ack_removes_message_permanently() { + let (producer, mut consumer) = make_pair().await; + + producer + .send(Message::from_json(&1u32).unwrap()) + .await + .unwrap(); + producer + .send(Message::from_json(&2u32).unwrap()) + .await + .unwrap(); + + let d1 = consumer.receive().await.unwrap(); + assert_eq!(d1.payload_json::().unwrap(), 1); + d1.ack().await.unwrap(); + + let d2 = consumer.receive().await.unwrap(); + assert_eq!(d2.payload_json::().unwrap(), 2); + assert_eq!( + d2.delivery_count(), + 1, + "message 2 should be a firs deliverym not a redelivery of message 1" + ); + d2.ack().await.unwrap(); + + assert!( + consumer.try_receive().await.unwrap().is_none(), + "queue should be empty after both messages were ack'd" + ); +} + +#[tokio::test] +async fn routing_key_preserved() { + let (producer, mut consumer) = make_pair().await; + producer + .send(Message::from_json_with_key(&1u32, "orders.placed").unwrap()) + .await + .unwrap(); + let d = consumer.receive().await.unwrap(); + assert_eq!(d.routing_key(), Some("orders.placed")); + d.ack().await.unwrap(); +} + +#[tokio::test] +async fn headers_preserved() { + let (producer, mut consumer) = make_pair().await; + let msg = Message::builder() + .payload(Bytes::from_static(b"hello")) + .header("x-custom", "value-123") + .build() + .unwrap(); + producer.send(msg).await.unwrap(); + let d = consumer.receive().await.unwrap(); + assert_eq!(d.header("x-custom"), Some("value-123")); + d.ack().await.unwrap(); +} + +#[tokio::test] +async fn nack_requeues_with_incremented_count() { + let (producer, mut consumer) = make_pair().await; + producer + .send(Message::from_json(&99u32).unwrap()) + .await + .unwrap(); + + let d1 = consumer.receive().await.unwrap(); + assert_eq!(d1.delivery_count(), 1); + d1.nack().await.unwrap(); + + let d2 = consumer.receive().await.unwrap(); + assert_eq!(d2.delivery_count(), 2); + d2.ack().await.unwrap(); +} + +#[tokio::test] +async fn nack_routes_to_dlq_after_max_count() { + let (producer, mut consumer) = MemoryBackend::builder(MemoryConfig::default()) + .dead_letter_queue(DlqConfig::new("test-dlq".to_string(), 2)) + .build_pair() + .await + .unwrap(); + + producer + .send(Message::from_json(&1u32).unwrap()) + .await + .unwrap(); + + consumer.receive().await.unwrap().nack().await.unwrap(); + consumer.receive().await.unwrap().nack().await.unwrap(); + + assert!(consumer.try_receive().await.unwrap().is_none()); +} + +#[tokio::test] +async fn try_receive_returns_none_when_empty() { + let (_, mut consumer) = make_pair().await; + assert!(consumer.try_receive().await.unwrap().is_none()); +} + +#[tokio::test] +async fn batch_send_and_receive() { + let (producer, mut consumer) = make_pair().await; + + let ids = producer + .send_batch(vec![ + Message::from_json(&1u32).unwrap(), + Message::from_json(&2u32).unwrap(), + Message::from_json(&3u32).unwrap(), + ]) + .await + .unwrap(); + assert_eq!(ids.len(), 3); + + let deliveries = consumer.receive_batch(10).await.unwrap(); + assert_eq!(deliveries.len(), 3); + for d in deliveries { + d.ack().await.unwrap(); + } +} + +#[tokio::test] +async fn graceful_shutdown_stops_receive_graceful() { + let (_producer, mut consumer) = make_pair().await; + let shutdown = consumer.shutdown_handle(); + + tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; + shutdown.shutdown(); + }); + + let result = tokio::time::timeout( + tokio::time::Duration::from_millis(200), + consumer.receive_graceful(), + ) + .await + .expect("should resolve after shutdown signal"); + + assert!(result.is_none()); +} + +#[tokio::test] +async fn scheduled_message_delayed() { + let (producer, mut consumer) = make_pair().await; + + let delivery_at = Utc::now() + chrono::Duration::milliseconds(100); + producer + .send_at(Message::from_json(&7u32).unwrap(), delivery_at) + .await + .unwrap(); + + assert!(consumer.try_receive().await.unwrap().is_none()); + + tokio::time::sleep(tokio::time::Duration::from_millis(130)).await; + let d = consumer.receive().await.unwrap(); + assert_eq!(d.payload_json::().unwrap(), 7); + d.ack().await.unwrap(); +} + +#[tokio::test] +async fn stream_yiels_messages_in_order() { + use futures::StreamExt; + + let (producer, consumer) = make_pair().await; + + for i in 0u32..5 { + producer + .send(Message::from_json(&i).unwrap()) + .await + .unwrap(); + } + + let values: Vec = consumer + .into_stream() + .take(5) + .map(|r| r.unwrap().payload_json::().unwrap()) + .collect() + .await; + + assert_eq!(values, vec![0, 1, 2, 3, 4]); +} + +#[tokio::test] +async fn stream_ends_when_producer_dropped() { + use futures::StreamExt; + + let (producer, consumer) = make_pair().await; + + for i in 0u32..5 { + producer + .send(Message::from_json(&i).unwrap()) + .await + .unwrap(); + } + + drop(producer); + + let (values, errors): (Vec<_>, Vec<_>) = consumer + .into_stream() + .collect::>() + .await + .into_iter() + .partition(Result::is_ok); + + let values: Vec = values + .into_iter() + .map(|r| r.unwrap().payload_json::().unwrap()) + .collect(); + + assert_eq!(values, vec![0, 1, 2, 3, 4], "all sent messags must arrive"); + assert_eq!( + errors.len(), + 1, + "exactly one ConnectionLost error closes the stream" + ); +} + +#[tokio::test] +async fn stream_ack_removes_messages() { + // Each delivery pulled from the stream mmust be explicitly ack'd + // After acking all messages the queue must be empty + use futures::StreamExt; + + let (producer, consumer) = make_pair().await; + + for i in 0u32..5 { + producer + .send(Message::from_json(&i).unwrap()) + .await + .unwrap(); + } + + let deliveries: Vec<_> = consumer + .into_stream() + .take(5) + .map(|r| r.unwrap()) + .collect() + .await; + + assert_eq!(deliveries.len(), 5); + + for d in deliveries { + d.ack().await.unwrap(); + } + + let (_, mut check_consumer) = make_pair().await; + assert!( + check_consumer.try_receive().await.unwrap().is_none(), + "a separate empty queue confirms ack'd items are not requeued + the original stream consumer owns the channel and has drained it + " + ); +} + +#[tokio::test] +async fn stream_stops_on_shutdown() { + // The stream must terminate cleanly when the shutdown handle fires, + // even if there are no message in the queue + use futures::StreamExt; + + let (_producer, consumer) = make_pair().await; + let shutdown = consumer.shutdown_handle(); + + tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_millis(30)).await; + shutdown.shutdown(); + }); + + let result = tokio::time::timeout( + tokio::time::Duration::from_millis(200), + consumer.into_stream().collect::>(), + ) + .await; + + assert!( + result.is_ok(), + "stream should have terminated after shutdown signal" + ); + assert!( + result.unwrap().is_empty(), + "no message weere sent so stream yields nothing" + ); +} + +#[tokio::test] +async fn stream_stops_on_shutdown_after_partial_consume() { + // Send some message, consume a few via the stream, then shutdown. + // The stream should stop mid-way, remaining messages stay in the channel + use futures::StreamExt; + + let (producer, consumer) = make_pair().await; + let shutdown = consumer.shutdown_handle(); + + for i in 0u32..10 { + producer + .send(Message::from_json(&i).unwrap()) + .await + .unwrap(); + } + + tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + shutdown.shutdown(); + }); + + let received: Vec = tokio::time::timeout( + tokio::time::Duration::from_millis(300), + consumer + .into_stream() + .map(|r| r.unwrap()) + .then(|d| async move { + let val = d.payload_json::().unwrap(); + d.ack().await.unwrap(); + val + }) + .collect::>(), + ) + .await + .expect("stream would terminate after shutdown"); + + assert!( + !received.is_empty(), + "at least one message should have been received" + ); + assert!(received.len() <= 10); + + let expected: Vec = (0..u32::try_from(received.len()).unwrap()).collect(); + assert_eq!(received, expected); +} + +#[tokio::test] +async fn stream_for_each_concurrent() { + // for_each_concurrent is the standard production usage pattern, + // N tasks processing messages in parallel. Verify all messages are + // received exactly once and ack'd correctly + use futures::StreamExt; + use std::sync::{Arc, Mutex}; + + const MSG_COUNT: u32 = 20; + let (prodcuer, consumer) = make_pair().await; + + for i in 0..MSG_COUNT { + prodcuer + .send(Message::from_json(&i).unwrap()) + .await + .unwrap(); + } + + let received = Arc::new(Mutex::new(Vec::::new())); + let received_clone = Arc::clone(&received); + + consumer + .into_stream() + .take(MSG_COUNT as usize) + .for_each_concurrent(4, |result| { + let received_ref = Arc::clone(&received_clone); + async move { + let delivery = result.unwrap(); + let val = delivery.payload_json::().unwrap(); + delivery.ack().await.unwrap(); + received_ref.lock().unwrap().push(val); + } + }) + .await; + + let values = received.lock().unwrap().clone(); + + let expected: Vec = (0..MSG_COUNT).collect(); + assert_eq!( + values, expected, + "every message must be received exactly once" + ); +} + +#[tokio::test] +async fn messages_yeild_messages() { + use futures::StreamExt; + + let (producer, mut consumer) = make_pair().await; + + producer + .send(Message::from_json(&"hello").unwrap()) + .await + .unwrap(); + + let mut messages = consumer.messages().await.unwrap(); + + let message = messages.next().await.unwrap().unwrap(); + let value = message.payload_json::().unwrap(); + assert_eq!(value, "hello".to_string()); +} + +#[tokio::test] +async fn messages_yeild_messages_in_order() { + use futures::StreamExt; + + let (producer, mut consumer) = make_pair().await; + + for i in 0u32..5 { + producer + .send(Message::from_json(&i).unwrap()) + .await + .unwrap(); + } + + let mut messages = consumer.messages().await.unwrap(); + + let mut values = Vec::new(); + for _ in 0..5 { + let result = messages.next().await.unwrap().unwrap(); + let val: u32 = result.payload_json().unwrap(); + values.push(val); + } + + assert_eq!(values, vec![0, 1, 2, 3, 4]); +} + +#[tokio::test] +async fn messages_end_when_producer_dropped() { + use futures::StreamExt; + + let (producer, mut consumer) = make_pair().await; + + for i in 0u32..5 { + producer + .send(Message::from_json(&i).unwrap()) + .await + .unwrap(); + } + + drop(producer); + + let messages = consumer.messages().await.unwrap(); + + let (values, errors): (Vec<_>, Vec<_>) = messages + .collect::>() + .await + .into_iter() + .partition(Result::is_ok); + + let values: Vec = values + .into_iter() + .map(|r| r.unwrap().payload_json::().unwrap()) + .collect(); + + assert_eq!(values, vec![0, 1, 2, 3, 4], "all sent messages must arrive"); + assert_eq!( + errors.len(), + 1, + "exactly one ConnnectionLost error closes the stream" + ); +} + +#[tokio::test] +async fn messages_ack_removes_messages() { + use futures::StreamExt; + + let (producer, mut consumer) = make_pair().await; + + for i in 0u32..5 { + producer + .send(Message::from_json(&i).unwrap()) + .await + .unwrap(); + } + + let messages = consumer.messages().await.unwrap(); + + let deliveries: Vec<_> = messages.take(5).map(|r| r.unwrap()).collect().await; + + assert_eq!(deliveries.len(), 5); + + for d in deliveries { + d.ack().await.unwrap(); + } + + assert!( + consumer.try_receive().await.unwrap().is_none(), + "queue should be empty after all streamed messages are ack'd" + ); +} + +#[tokio::test] +async fn messages_stop_on_shutdown() { + use futures::StreamExt; + + let (_producer, mut consumer) = make_pair().await; + let shutdown = consumer.shutdown_handle(); + + tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_millis(30)).await; + shutdown.shutdown(); + }); + + let mut messages = consumer.messages().await.unwrap(); + + let result = tokio::time::timeout( + tokio::time::Duration::from_millis(200), + messages.by_ref().collect::>(), + ) + .await; + + assert!( + result.is_ok(), + "messages stream should terminate after shutdown signal" + ); + assert!( + result.unwrap().is_empty(), + "no messages were sent so stream yields nothing" + ); +} + +#[tokio::test] +async fn messages_stop_on_shutdown_after_partial_consume() { + use futures::StreamExt; + + let (producer, mut consumer) = make_pair().await; + let shutdown = consumer.shutdown_handle(); + + for i in 0u32..10 { + producer + .send(Message::from_json(&i).unwrap()) + .await + .unwrap(); + } + + tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + shutdown.shutdown(); + }); + + let mut messages = consumer.messages().await.unwrap(); + + let received: Vec = tokio::time::timeout( + tokio::time::Duration::from_millis(300), + messages + .by_ref() + .map(|r| r.unwrap()) + .then(|d| async move { + let val = d.payload_json::().unwrap(); + d.ack().await.unwrap(); + val + }) + .collect::>(), + ) + .await + .expect("messages stream should terminate after shutdown"); + + assert!( + !received.is_empty(), + "at least one message should have been received" + ); + assert!(received.len() <= 10); + + let expected: Vec = (0..u32::try_from(received.len()).unwrap()).collect(); + assert_eq!(received, expected); +} + +#[tokio::test] +async fn messages_for_each_concurrent() { + use futures::StreamExt; + use std::sync::{Arc, Mutex}; + + const MSG_COUNT: u32 = 20; + let (producer, mut consumer) = make_pair().await; + + for i in 0..MSG_COUNT { + producer + .send(Message::from_json(&i).unwrap()) + .await + .unwrap(); + } + + let received = Arc::new(Mutex::new(Vec::::new())); + let received_clone = Arc::clone(&received); + + let mut messages = consumer.messages().await.unwrap(); + + messages + .by_ref() + .take(MSG_COUNT as usize) + .for_each_concurrent(4, |result| { + let received_ref = Arc::clone(&received_clone); + async move { + let delivery = result.unwrap(); + let val = delivery.payload_json::().unwrap(); + delivery.ack().await.unwrap(); + received_ref.lock().unwrap().push(val); + } + }) + .await; + + let mut values = received.lock().unwrap().clone(); + values.sort_unstable(); + + let expected: Vec = (0..MSG_COUNT).collect(); + assert_eq!( + values, expected, + "every message must be received exactly once" + ); +} + +#[tokio::test] +async fn messages_stream_can_be_dropped_and_consumer_reused() { + use futures::StreamExt; + + let (producer, mut consumer) = make_pair().await; + + for i in 0u32..5 { + producer + .send(Message::from_json(&i).unwrap()) + .await + .unwrap(); + } + + { + let mut messages = consumer.messages().await.unwrap(); + + let first_two: Vec = messages + .by_ref() + .take(2) + .map(|r| r.unwrap().payload_json::().unwrap()) + .collect() + .await; + + assert_eq!(first_two, vec![0, 1]); + } + + let d = consumer.receive().await.unwrap(); + assert_eq!(d.payload_json::().unwrap(), 2); + d.ack().await.unwrap(); +} + +#[tokio::test] +async fn messages_stream_can_be_recreated_on_same_consumer() { + use futures::StreamExt; + + let (producer, mut consumer) = make_pair().await; + + for i in 0u32..6 { + producer + .send(Message::from_json(&i).unwrap()) + .await + .unwrap(); + } + + let first_batch = { + let mut messages = consumer.messages().await.unwrap(); + messages + .by_ref() + .take(3) + .map(|r| r.unwrap().payload_json::().unwrap()) + .collect::>() + .await + }; + + let second_batch = { + let mut messages = consumer.messages().await.unwrap(); + messages + .by_ref() + .take(3) + .map(|r| r.unwrap().payload_json::().unwrap()) + .collect::>() + .await + }; + + assert_eq!(first_batch, vec![0, 1, 2]); + assert_eq!(second_batch, vec![3, 4, 5]); +} + +#[tokio::test] +async fn messages_nack_requeues_with_incremented_count() { + use futures::StreamExt; + + let (producer, mut consumer) = make_pair().await; + + producer + .send(Message::from_json(&99u32).unwrap()) + .await + .unwrap(); + + let mut messages = consumer.messages().await.unwrap(); + + let d1 = messages.next().await.unwrap().unwrap(); + assert_eq!(d1.delivery_count(), 1); + d1.nack().await.unwrap(); + + drop(messages); + + let d2 = consumer.receive().await.unwrap(); + assert_eq!(d2.delivery_count(), 2); + assert_eq!(d2.payload_json::().unwrap(), 99); + d2.ack().await.unwrap(); +} + +#[tokio::test] +async fn messages_sees_scheduled_messages_when_due() { + use futures::StreamExt; + + let (producer, mut consumer) = make_pair().await; + + let delivery_at = Utc::now() + chrono::Duration::milliseconds(100); + producer + .send_at(Message::from_json(&7u32).unwrap(), delivery_at) + .await + .unwrap(); + + let mut messages = consumer.messages().await.unwrap(); + + let d = tokio::time::timeout(tokio::time::Duration::from_millis(300), messages.next()) + .await + .expect("messages stream should yield once scheduled message becomes due") + .unwrap() + .unwrap(); + + assert_eq!(d.payload_json::().unwrap(), 7); + d.ack().await.unwrap(); +} + +#[tokio::test] +async fn dyn_procducer_and_consumer_work() { + let (producer, mut consumer) = MemoryBackend::builder(MemoryConfig::default()) + .make_dynamic() + .build_pair() + .await + .unwrap(); + + producer + .send(Message::from_json(&777u32).unwrap()) + .await + .unwrap(); + let delivery = consumer.receive().await.unwrap(); + assert_eq!(delivery.payload_json::().unwrap(), 777); + delivery.ack().await.unwrap(); +} + +#[tokio::test] +async fn dyn_consumer_messages_work() { + use futures::StreamExt; + + let (producer, mut consumer) = MemoryBackend::builder(MemoryConfig::default()) + .make_dynamic() + .build_pair() + .await + .unwrap(); + + for i in 0u32..3 { + producer + .send(Message::from_json(&i).unwrap()) + .await + .unwrap(); + } + + let mut messages = consumer.messages().await.unwrap(); + + let values: Vec = messages + .by_ref() + .take(3) + .map(|r| r.unwrap().payload_json::().unwrap()) + .collect() + .await; + + assert_eq!(values, vec![0, 1, 2]); +}