diff --git a/rust-stream/Cargo.lock b/rust-stream/Cargo.lock index 2259e460..c10a2af4 100644 --- a/rust-stream/Cargo.lock +++ b/rust-stream/Cargo.lock @@ -382,13 +382,14 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.11" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4" dependencies = [ + "hermit-abi", "libc", "wasi", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -400,16 +401,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi", - "libc", -] - [[package]] name = "num_enum" version = "0.7.2" @@ -551,9 +542,9 @@ dependencies = [ [[package]] name = "rabbitmq-stream-client" -version = "0.4.2" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88174e13a7d829a79af91437e439722bd6b095c819f7f2973c25e6ff2b94e173" +checksum = "38019b93705e3371296fe57a10740a2ea669fee1a210ad7ff647a26e8ccd697e" dependencies = [ "async-trait", "bytes", @@ -637,7 +628,7 @@ dependencies = [ "libc", "spin", "untrusted", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -748,7 +739,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -816,28 +807,27 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.37.0" +version = "1.39.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" +checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" dependencies = [ "backtrace", "bytes", "libc", "mio", - "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", @@ -1040,15 +1030,6 @@ dependencies = [ "windows-targets 0.52.5", ] -[[package]] -name = "windows-sys" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" -dependencies = [ - "windows-targets 0.48.5", -] - [[package]] name = "windows-sys" version = "0.52.0" diff --git a/rust-stream/Cargo.toml b/rust-stream/Cargo.toml index 024d3084..4da2cce0 100644 --- a/rust-stream/Cargo.toml +++ b/rust-stream/Cargo.toml @@ -6,6 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -rabbitmq-stream-client = "0.4.2" -tokio = { version = "1.0.0", features = ["rt", "rt-multi-thread", "macros"] } +rabbitmq-stream-client = "0.4.3" +tokio = { version = "1.39.0", features = ["rt", "rt-multi-thread", "macros"] } futures = "0.3.30" diff --git a/rust-stream/README.md b/rust-stream/README.md index a02e81a5..e8c8cb47 100644 --- a/rust-stream/README.md +++ b/rust-stream/README.md @@ -19,4 +19,9 @@ Each cargo command should be launched in a separate shell. #### [Tutorial one: "Hello World!"](https://www.rabbitmq.com/tutorials/tutorial-one-rust-stream.html) cargo run --bin receive - cargo run --bin send \ No newline at end of file + cargo run --bin send + +#### [Tutorial one: "Offset tracking!"](https://www.rabbitmq.com/tutorials/tutorial-one-rust-stream.html) + + cargo run --bin send_offset_tracking + cargo run --bin receive_offset_tracking \ No newline at end of file diff --git a/rust-stream/src/bin/receive_offset_tracking.rs b/rust-stream/src/bin/receive_offset_tracking.rs new file mode 100644 index 00000000..cd2fe6ea --- /dev/null +++ b/rust-stream/src/bin/receive_offset_tracking.rs @@ -0,0 +1,107 @@ +use futures::StreamExt; +use rabbitmq_stream_client::error::StreamCreateError; +use rabbitmq_stream_client::types::{ByteCapacity, OffsetSpecification, ResponseCode}; +use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::Arc; +use tokio::sync::Notify; +use tokio::task; + +#[tokio::main] +async fn main() -> Result<(), Box> { + use rabbitmq_stream_client::Environment; + let environment = Environment::builder().build().await?; + let stream = "pippo"; + let first_offset = Arc::new(AtomicI64::new(-1)); + let last_offset = Arc::new(AtomicI64::new(-1)); + let notify_on_close = Arc::new(Notify::new()); + let create_response = environment + .stream_creator() + .max_length(ByteCapacity::GB(2)) + .create(stream) + .await; + + if let Err(e) = create_response { + if let StreamCreateError::Create { stream, status } = e { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); + std::process::exit(1); + } + } + } + } + + let stored_offset:u64 = 45; + let mut consumer = environment + .consumer() + .name("consumer-1") + .offset(OffsetSpecification::Offset(stored_offset)) + .build(stream) + .await + .unwrap(); + + println!("Started consuming"); + + /*let mut stored_offset: u64 = consumer.query_offset().await.unwrap_or_else(|_| 0); + + if stored_offset > 0 { + stored_offset += 1; + } + consumer = environment + .consumer() + .name("consumer-1") + .offset(OffsetSpecification::Offset(42)) + .build(stream) + .await + .unwrap();*/ + + let first_cloned_offset = first_offset.clone(); + let last_cloned_offset = last_offset.clone(); + let notify_on_close_cloned = notify_on_close.clone(); + + task::spawn(async move { + let mut received_messages = -1; + while let Some(delivery) = consumer.next().await { + let d = delivery.unwrap(); + + println!("offset {} ", d.offset()); + if first_offset.load(Ordering::Relaxed) == -1 { + println!("First message received"); + _ = first_offset.compare_exchange( + first_offset.load(Ordering::Relaxed), + d.offset() as i64, + Ordering::Relaxed, + Ordering::Relaxed, + ); + } + //received_messages = received_messages + 1; + if String::from_utf8_lossy(d.message().data().unwrap()).contains("marker") + { + /*let _ = consumer + .store_offset(d.offset()) + .await + .unwrap_or_else(|e| println!("Err: {}", e));*/ + if String::from_utf8_lossy(d.message().data().unwrap()).contains("marker") { + last_offset.store(d.offset() as i64, Ordering::Relaxed); + let handle = consumer.handle(); + _ = handle.close().await; + notify_on_close_cloned.notify_one(); + break; + } + } + } + }); + + notify_on_close.notified().await; + + if first_cloned_offset.load(Ordering::Relaxed) != -1 { + println!( + "Done consuming first_offset: {:?} last_offset: {:?} ", + first_cloned_offset, last_cloned_offset + ); + } + + Ok(()) +} diff --git a/rust-stream/src/bin/send_offset_tracking.rs b/rust-stream/src/bin/send_offset_tracking.rs new file mode 100644 index 00000000..bc5b8e2c --- /dev/null +++ b/rust-stream/src/bin/send_offset_tracking.rs @@ -0,0 +1,66 @@ +use rabbitmq_stream_client::error::StreamCreateError; +use rabbitmq_stream_client::types::{ByteCapacity, Message, ResponseCode}; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use tokio::sync::Notify; + +#[tokio::main] +async fn main() -> Result<(), Box> { + use rabbitmq_stream_client::Environment; + let environment = Environment::builder().build().await?; + let stream = "pippo"; + let message_count = 100; + let confirmed_messages = Arc::new(AtomicU32::new(0)); + let notify_on_send = Arc::new(Notify::new()); + + let create_response = environment + .stream_creator() + .max_length(ByteCapacity::GB(2)) + .create(stream) + .await; + + if let Err(e) = create_response { + if let StreamCreateError::Create { stream, status } = e { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); + std::process::exit(1); + } + } + } + } + + println!("Publishing {:?} messages", message_count); + + let producer = environment.producer().build(stream).await?; + + for i in 0..message_count { + let msg; + if i < message_count - 1 { + msg = Message::builder().body(format!("hello{}", i)).build(); + } else { + msg = Message::builder().body(format!("marker{}", i)).build(); + }; + + let counter = confirmed_messages.clone(); + let notifier = notify_on_send.clone(); + producer + .send(msg, move |_| { + let inner_counter = counter.clone(); + let inner_notifier = notifier.clone(); + async move { + if inner_counter.fetch_add(1, Ordering::Relaxed) == message_count - 1 { + inner_notifier.notify_one(); + } + } + }) + .await?; + } + + notify_on_send.notified().await; + println!("Messages confirmed: True"); + producer.close().await?; + Ok(()) +}