8000 tutorial2: rust implementation · rabbitmq/rabbitmq-tutorials@423f0f0 · GitHub
[go: up one dir, main page]

Skip to content

Commit 423f0f0

Browse files
committed
tutorial2: rust implementation
1 parent 757bc5e commit 423f0f0

File tree

5 files changed

+188
-32
lines changed

5 files changed

+188
-32
lines changed

rust-stream/Cargo.lock

Lines changed: 11 additions & 30 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust-stream/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ edition = "2021"
77

88
[dependencies]
99
rabbitmq-stream-client = "0.4.2"
10-
tokio = { version = "1.0.0", features = ["rt", "rt-multi-thread", "macros"] }
10+
tokio = { version = "1.39.0", features = ["rt", "rt-multi-thread", "macros"] }
1111
futures = "0.3.30"

rust-stream/README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,9 @@ Each cargo command should be launched in a separate shell.
1919
#### [Tutorial one: "Hello World!"](https://www.rabbitmq.com/tutorials/tutorial-one-rust-stream.html)
2020

2121
cargo run --bin receive
22-
cargo run --bin send
22+
cargo run --bin send
23+
24+
#### [Tutorial one: "Offset tracking!"](https://www.rabbitmq.com/tutorials/tutorial-one-rust-stream.html)
25+
26+
cargo run --bin send_offset_tracking
27+
cargo run --bin receive_offset_tracking
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
use futures::StreamExt;
2+
use rabbitmq_stream_client::error::StreamCreateError;
3+
use rabbitmq_stream_client::types::{ByteCapacity, OffsetSpecification, ResponseCode};
4+
use std::io::stdin;
5+
use std::sync::atomic::{AtomicI64, Ordering};
6+
use std::sync::Arc;
7+
use tokio::task;
8+
9+
#[tokio::main]
10+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
11+
use rabbitmq_stream_client::Environment;
12+
let environment = Environment::builder().build().await?;
13+
let stream = "stream-offset-tracking-rust";
14+
let received_messages = Arc::new(AtomicI64::new(-1));
15+
let first_offset = Arc::new(AtomicI64::new(-1));
16+
let last_offset = Arc::new(AtomicI64::new(-1));
17+
let create_response = environment
18+
.stream_creator()
19+
.max_length(ByteCapacity::GB(2))
20+
.create(stream)
21+
.await;
22+
23+
if let Err(e) = create_response {
24+
if let StreamCreateError::Create { stream, status } = e {
25+
match status {
26+
// we can ignore this error because the stream already exists
27+
ResponseCode::StreamAlreadyExists => {}
28+
err => {
29+
println!("Error creating stream: {:?} {:?}", stream, err);
30+
std::process::exit(1);
31+
}
32+
}
33+
}
34+
}
35+
36+
let mut consumer = environment
37+
.consumer()
38+
.name("consumer-1")
39+
.offset(OffsetSpecification::First)
40+
.build(stream)
41+
.await
42+
.unwrap();
43+
44+
println!("Starting consuming");
45+
println!("Press any key to close the consumer");
46+
47+
let mut stored_offset: u64 = consumer.query_offset().await.unwrap_or_else(|_| 0);
48+
49+
if stored_offset > 0 {
50+
stored_offset += 1;
51+
}
52+
consumer = environment
53+
.consumer()
54+
.name("consumer-1")
55+
.offset(OffsetSpecification::Offset(stored_offset))
56+
.build(stream)
57+
.await
58+
.unwrap();
59+
60+
let first_cloned_offset = first_offset.clone();
61+
let last_cloned_offset = last_offset.clone();
62+
63+
task::spawn(async move {
64+
while let Some(delivery) = consumer.next().await {
65+
let d = delivery.unwrap();
66+
67+
if first_offset.load(Ordering::Relaxed) == -1 {
68+
println!("consuming first message");
69+
_ = first_offset.compare_exchange(
70+
first_offset.load(Ordering::Relaxed),
71+
d.offset() as i64,
72+
Ordering::Relaxed,
73+
Ordering::Relaxed,
74+
);
75+
}
76+
77+
if received_messages.fetch_add(1, Ordering::Relaxed) % 10 == 0
78+
|| String::from_utf8_lossy(d.message().data().unwrap()).contains("marker")
79+
{
80+
let _ = consumer
81+
.store_offset(d.offset())
82+
.await
83+
.unwrap_or_else(|e| println!("Err: {}", e));
84+
if String::from_utf8_lossy(d.message().data().unwrap()).contains("marker") {
85+
last_offset.store(d.offset() as i64, Ordering::Relaxed);
86+
let handle = consumer.handle();
87+
_ = handle.close().await;
88+
89+
}
90+
}
91+
}
92+
});
93+
94+
_ = stdin().read_line(&mut "".to_string());
95+
96+
if first_cloned_offset.load(Ordering::Relaxed) != -1 {
97+
println!(
98+
"Done consuming first_offset: {:?} last_offset: {:?} ",
99+
first_cloned_offset, last_cloned_offset
100+
);
101+
}
102+
103+
Ok(())
104+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use rabbitmq_stream_client::error::StreamCreateError;
2+
use rabbitmq_stream_client::types::{ByteCapacity, Message, ResponseCode};
3+
use std::sync::atomic::{AtomicU32, Ordering};
4+
use std::sync::Arc;
5+
use tokio::sync::Notify;
6+
7+
#[tokio::main]
8+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
9+
use rabbitmq_stream_client::Environment;
10+
let environment = Environment::builder().build().await?;
11+
let stream = "stream-offset-tracking-rust";
12+
let message_count = 100;
13+
let confirmed_messages = Arc::new(AtomicU32::new(0));
14+
let notify_on_send = Arc::new(Notify::new());
15+
16+
let create_response = environment
17+
.stream_creator()
18+
.max_length(ByteCapacity::GB(2))
19+
.create(stream)
20+
.await;
21+
22+
if let Err(e) = create_response {
23+
if let StreamCreateError::Create { stream, status } = e {
24+
match status {
25+
// we can ignore this error because the stream already exists
26+
ResponseCode::StreamAlreadyExists => {}
27+
err => {
28+
println!("Error creating stream: {:?} {:?}", stream, err);
29+
std::process::exit(1);
30+
}
31+
}
32+
}
33+
}
34+
35+
println!("Publishing {:?} messages", message_count);
36+
37+
let producer = environment.producer().build(stream).await?;
38+
39+
for i in 0..message_count {
40+
let msg;
41+
if i < message_count - 1 {
42+
msg = Message::builder().body(format!("hello{}", i)).build();
43+
} else {
44+
msg = Message::builder().body(format!("marker{}", i)).build();
45+
};
46+
47+
let counter = confirmed_messages.clone();
48+
let notifier = notify_on_send.clone();
49+
producer
50+
.send(msg, move |_| {
51+
let inner_counter = counter.clone();
52+
let inner_notifier = notifier.clone();
53+
async move {
54+
if inner_counter.fetch_add(1, Ordering::Relaxed) == message_count - 1 {
55+
inner_notifier.notify_one();
56+
}
57+
}
58+
})
59+
.await?;
60+
}
61+
62+
notify_on_send.notified().await;
63+
println!("Messages confirmed.");
64+
producer.close().await?;
65+
Ok(())
66+
}

0 commit comments

Comments
 (0)
0