8000 replacing command key exit with Notify · rabbitmq/rabbitmq-tutorials@03a853b · GitHub
[go: up one dir, main page]

Skip to content

Commit 03a853b

Browse files
committed
replacing command key exit with Notify
1 parent 423f0f0 commit 03a853b

File tree

2 files changed

+8
-6
lines changed

2 files changed

+8
-6
lines changed

rust-stream/src/bin/receive_offset_tracking.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use futures::StreamExt;
22
use rabbitmq_stream_client::error::StreamCreateError;
33
use rabbitmq_stream_client::types::{ByteCapacity, OffsetSpecification, ResponseCode};
4-
use std::io::stdin;
54
use std::sync::atomic::{AtomicI64, Ordering};
65
use std::sync::Arc;
6+
use tokio::sync::Notify;
77
use tokio::task;
88

99
#[tokio::main]
@@ -14,6 +14,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1414
let received_messages = Arc::new(AtomicI64::new(-1));
1515
let first_offset = Arc::new(AtomicI64::new(-1));
1616
let last_offset = Arc::new(AtomicI64::new(-1));
17+
let notify_on_close = Arc::new(Notify::new());
1718
let create_response = environment
1819
.stream_creator()
1920
.max_length(ByteCapacity::GB(2))
@@ -41,8 +42,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4142
.await
4243
.unwrap();
4344

44-
println!("Starting consuming");
45-
println!("Press any key to close the consumer");
45+
println!("Started consuming");
4646

4747
let mut stored_offset: u64 = consumer.query_offset().await.unwrap_or_else(|_| 0);
4848

@@ -59,13 +59,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5959

6060
let first_cloned_offset = first_offset.clone();
6161
let last_cloned_offset = last_offset.clone();
62+
let notify_on_close_cloned = notify_on_close.clone();
6263

6364
task::spawn(async move {
6465
while let Some(delivery) = consumer.next().await {
6566
let d = delivery.unwrap();
6667

6768
if first_offset.load(Ordering::Relaxed) == -1 {
68-
println!("consuming first message");
69+
println!("First message received");
6970
_ = first_offset.compare_exchange(
7071
first_offset.load(Ordering::Relaxed),
7172
d.offset() as i64,
@@ -85,13 +86,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
8586
last_offset.store(d.offset() as i64, Ordering::Relaxed);
8687
let handle = consumer.handle();
8788
_ = handle.close().await;
89+
notify_on_close_cloned.notify_one();
8890

8991
}
9092
}
9193
}
9294
});
9395

94-
_ = stdin().read_line(&mut "".to_string());
96+
notify_on_close.notified().await;
9597

9698
if first_cloned_offset.load(Ordering::Relaxed) != -1 {
9799
println!(

rust-stream/src/bin/send_offset_tracking.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6060
}
6161

6262
notify_on_send.notified().await;
63-
println!("Messages confirmed.");
63+
println!("Messages confirmed: True");
6464
producer.close().await?;
6565
Ok(())
6666
}

0 commit comments

Comments
 (0)
0