8000 First cut of python/3 · liborange/rabbitmq-tutorials@1887d5b · GitHub
[go: up one dir, main page]

Skip to content

Commit 1887d5b

Browse files
committed
First cut of python/3
1 parent b45ca3b commit 1887d5b

File tree

3 files changed

+339
-0
lines changed

3 files changed

+339
-0
lines changed

python/emit_log.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#!/usr/bin/env python
2+
import pika
3+
import sys
4+
5+
connection = pika.AsyncoreConnection(pika.ConnectionParameters(
6+
host='127.0.0.1',
7+
credentials=pika.PlainCredentials('guest', 'guest')))
8+
channel = connection.channel()
9+
10+
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
11+
channel.basic_publish(exchange='logs',
12+
routing_key='',
13+
body=message)
14+
print " [x] Sent %r" % (message,)

python/receive_logs.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#!/usr/bin/env python
2+
import pika
3+
4+
connection = pika.AsyncoreConnection(pika.ConnectionParameters(
5+
host='127.0.0.1',
6+
credentials=pika.PlainCredentials('guest', 'guest')))
7+
channel = connection.channel()
8+
9+
channel.exchange_declare(exchange='logs',
10+
type='fanout')
11+
12+
result = channel.queue_declare(auto_delete=True)
13+
queue_name = result.queue
14+
15+
channel.queue_bind(exchange='logs',
16+
queue=queue_name)
17+
18+
print ' [*] Waiting for logs. To exit press CTRL+C'
19+
20+
def callback(ch, method, header, body):
21+
print " [x] %r" % (body,)
22+
23+
channel.basic_consume(callback,
24+
queue=queue_name,
25+
no_ack=True)
26+
27+
pika.asyncore_loop()

python/tutorial-three.mdx

Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
2+
Learning RabbitMQ, part 3 (Broadcast)
3+
=====================================
4+
5+
{% dot -Gsize="10,1.3" -Grankdir=LR %}
6+
digraph G {
7+
P1 [label=<P>, {{ dotstyle.producer }}];
8+
X [label="X", {{ dotstyle.exchange }}];
9+
Q1 [label="{<s>||||<e>}", {{ dotstyle.queue }}];
10+
Q2 [label="{<s>||||<e>}", {{ dotstyle.queue }}];
11+
C1 [label=<C<font point-size="7">1</font>>, {{ dotstyle.consumer }}];
12+
C2 [label=<C<font point-size="7">2</font>>, {{ dotstyle.consumer }}];
13+
14+
P1 -> X;
15+
X -> Q1;
16+
X -> Q2;
17+
Q1 -> C1;
18+
Q2 -> C2;
19+
}
20+
{% enddot %}
21+
22+
In previous part of this tutorial we've learned how to create a task
23+
queue. The idea behind a task queue is that a task should be delivered
24+
to exactly one worker. In this part we'll do something completely
25+
different - we'll try to deliver a message to multiple consumers. This
26+
pattern is known as "publish-subscribe".
27+
28+
To illustrate this this tutorial, we're going to build a simple
29+
logging system. It will consist of two programs - one will emit log
30+
messages and one will receive them.
31+
32+
In our logging system we'll every running copy of the receiver program
33+
will be able to get the same messages. That way we'll be able to run one
34+
receiver and direct the logs to disk. In the same time we'll be able to run
35+
another reciver and see the same logs on the screen.
36+
37+
Essentially, crated log messages are going to be broadcasted to all
38+
the receivers.
39+
40+
41+
Exchanges
42+
---------
43+
44+
In previous parts of the tutorial we've understood how to send and
45+
receive messages. Now it's time to introduce the full messaging model
46+
in Rabbit.
47+
48+
Let's quickly remind what we've learned:
49+
50+
* _Producer_ is a name for user application that sends messages.
51+
* _Queue_ is a buffer that stores messages.
52+
* _Consumer_ is a name for user application that receives messages.
53+
54+
55+
The core idea behind the messaging model in Rabbit is that the
56+
producer never sends any messages directly to the queue. Actually,
57+
quite often the producer doesn't even know that a message won't be
58+
delivered to any queue!
59+
60+
Instead, the producer can only send messages to an _exchange_. An
61+
exchange is a very simple thing. On one side it receives messages from
62+
producers and the other side it pushes them to queues. The exchange
63+
must know exactly what to do with a received message. Should it be
64+
appended to a particular queue? Should it be appended to many queues?
65+
Or should it get silently discarded. The exact rules for that are
66+
defined by the _exchange type_.
67+
68+
{% dot -Gsize="10,1.3" -Grankdir=LR %}
69+
digraph G {
70+
P1 [label=<P>, {{ dotstyle.producer }}];
71+
X [label="X", {{ dotstyle.exchange }}];
72+
Q1 [label="{<s>||||<e>}", {{ dotstyle.queue }}];
73+
Q2 [label="{<s>||||<e>}", {{ dotstyle.queue }}];
74+
75+
P1 -> X;
76+
X -> Q1;
77+
X -> Q2;
78+
}
79+
{% enddot %}
80+
81+
82+
There are a few exchange types available: `direct`, `topic`,
83+
`headers` and `fanout`. We'll focus on the last one - the
84+
fanout. Let's create an exchange of that type, and name it `logs`:
85+
86+
{% highlight python %}
87+
channel.exchange_declare(exchange='logs',
88+
type='fanout')
89+
{% endhighlight %}
90+
91+
The fanout exchange is very simple. As you can probably guess from the
92+
name, it just broadcasts all the messages it receives to all the
93+
queues it knows. And that's exactly what we need for our logger.
94+
95+
96+
> #### Listing exchanges
97+
>
98+
> To list the exchanges on the server you can once again use the
99+
> Swiss Army Knife - `rabbitmqctl`:
100+
>
101+
> $ sudo rabbitmqctl list_exchanges
102+
> Listing exchanges ...
103+
> logs fanout
104+
> amq.direct direct
105+
> amq.topic topic
106+
> amq.fanout fanout
107+
> amq.headers headers
108+
> ...done.
109+
>
110+
> You can see a few `amq.` exchanges. They're created by default, but with a
111+
> bit of good luck you'll never need to use them.
112+
113+
<div></div>
114+
115+
> #### Nameless exchange
116+
>
117+
> In previous parts of the tutorial we knew nothing about exchanges,
118+
> but still were able to send messages to queues. That was possible
119+
> because we were using a default `""` _empty string_ (nameless) exchange.
120+
> Remember how publishing worked:
121+
>
122+
> chnnel.basic_publish(exchange='',
123+
> routing_key='test',
124+
> body=message)
125+
>
126+
> The _empty string_ exchange is a special exchange: every queue is connected
127+
> to it using its queue name as a key. When you publish a message to the
128+
> nameless exchange it will be routed to the queue with name specified
129+
> by `routing_key`.
130+
131+
132+
133+
Temporary queues
134+
----------------
135+
136+
In previous tutorial parts we were using a queue which had a name -
137+
`test` in first `test_dur` in second tutorial. Being able to name a
138+
queue was crucial for us - we needed to point the workers to the same
139+
queue. Essentially, giving a queue a name is important when you don't
140+
want to loose any messages if the consumer disconnects.
141+
142+
But that's not true for our logger. We do want to hear only about
143+
currently flowing log messages, we do not want to hear the old
144+
ones. To solve that problem we need two things.
145+
146+
First, whenever we connect the queue should be new and empty. To do it
147+
we could just use random queue name, or, even better - let server to
148+
choose a random unique queue name. We can do it by not supplying the
149+
`queue` parameter to `queue_declare`:
150+
151+
{% highlight python %}
152+
result = channel.queue_declare()
153+
{% endhighlight %}
154+
155+
At that point `result.queue` contains a random queue name. For example it
156+
may look like `amq.gen-U0srCoW8TsaXjNh73pnVAw==`.
157+
158+
Secondly, once we disconnect the client the queue should be
159+
deleted. There's an `auto_delete` flag for that:
160+
161+
{% highlight python %}
162+
result = channel.queue_declare(auto_delete=True)
163+
{% endhighlight %}
164+
165+
166+
Bindings
167+
--------
168+
169+
{% dot -Gsize="10,1.0" -Grankdir=LR %}
170+
digraph G {
171+
P1 [label=<P>, {{ dotstyle.producer }}];
172+
X [label="X", {{ dotstyle.exchange }}];
173+
Q1 [label="{<s>||||<e>}", {{ dotstyle.queue }}];
174+
Q2 [label="{<s>||||<e>}", {{ dotstyle.queue }}];
175+
176+
P1 -> X;
177+
X -> Q1 [label="binding"];
178+
X -> Q2 [label="binding"];
179+
}
180+
{% enddot %}
181+
182+
183+
We've already created a fanout exchange and a queue. Now we need to
184+
tell the exchange to send messages to our queue. That relationship,
185+
between exchange and a queue is called a _binding_.
186+
187+
{% highlight python %}
188+
channel.queue_bind(exchange='logs',
189+
queue=result.queue)
190+
{% endhighlight %}
191+
192+
From now on the `logs` exchange will broadcast the messages also to
193+
our queue.
194+
195+
> #### Listing bindings
196+
>
197+
> You can list existing bindings using, you guessed it,
198+
> `rabbitmqctl list_bindings` command.
199+
200+
201+
Putting it all together
202+
-----------------------
203+
204+
{% dot -Gsize="10,1.8" -Grankdir=LR %}
205+
digraph G {
206+
P [label=<P>, {{ dotstyle.producer }}];
207+
X [label=<logs>, {{ dotstyle.exchange }}];
208+
subgraph cluster_Q1 {
209+
label="amq.gen-RQ6...";
210+
color=transparent;
211+
Q1 [label="{<s>||||<e>}", {{ dotstyle.queue }}];
212+
};
213+
subgraph cluster_Q2 {
214+
label="amq.gen-As8...";
215+
color=transparent;
216+
Q2 [label="{<s>||||<e>}", {{ dotstyle.queue }}];
217+
};
218+
C1 [label=<C<font point-size="7">1</font>>, {{ dotstyle.consumer }}];
219+
C2 [label=<C<font point-size="7">2</font>>, {{ dotstyle.consumer }}];
220+
221+
P -> X;
222+
X -> Q1;
223+
X -> Q2;
224+
Q1 -> C1;
225+
Q2 -> C2;
226+
}
227+
{% enddot %}
228+
229+
The producer program, which emits log messages, doesn't look much
230+
different than in previous tutorial. The most important change is
231+
that, we now need to publish messages to `logs` exchange instead of
232+
the nameless one. We need to supply the `routing_key` parameter, but
233+
it's value is ignored for `fanout` exchanges. Here goes the code for
234+
`emit_log.py` script:
235+
236+
{% highlight python linenos=true %}
237+
#!/usr/bin/env python
238+
import pika
239+
import sys
240+
241+
connection = pika.AsyncoreConnection(pika.ConnectionParameters(
242+
host='127.0.0.1',
243+
credentials=pika.PlainCredentials('guest', 'guest')))
244+
channel = connection.channel()
245+
246+
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
247+
channel.basic_publish(exchange='logs',
248+
routing_key='',
249+
body=message)
250+
print " [x] Sent %r" % (message,)
251+
{% endhighlight %}
252+
253+
As you see, we avoided declaring exchange. If the `logs` exchange
254+
isn't created at the time this code is executed the message will be
255+
lost. That's okay for us.
256+
257+
The code for `receive_logs.py`:
258+
259+
{% highlight python linenos=true %}
260+
#!/usr/bin/env python
261+
import pika
262+
263+
connection = pika.AsyncoreConnection(pika.ConnectionParameters(
264+
host='127.0.0.1',
265+
credentials=pika.PlainCredentials('guest', 'guest')))
266+
channel = connection.channel()
267+
268+
channel.exchange_declare(exchange='logs',
269+
type='fanout')
270+
271+
result = channel.queue_declare(auto_delete=True)
272+
queue_name = result.queue
273+
274+
channel.queue_bind(exchange='logs',
275+
queue=queue_name)
276+
277+
print ' [*] Waiting for logs. To exit press CTRL+C'
278+
279+
def callback(ch, method, header, body):
280+
print " [x] %r" % (body,)
281+
282+
channel.basic_consume(callback,
283+
queue=queue_name,
284+
no_ack=True)
285+
286+
pika.asyncore_loop()
287+
{% endhighlight %}
288+
289+
290+
We're done. If you want to save logs to a file, just open a console and type:
291+
292+
$ ./receive_logs.py > logs_from_rabbit.log
293+
294+
If you wish to see the logs on your screen, spawn a new terminal and run:
295+
296+
$ ./receive_logs.py
297+
298+

0 commit comments

Comments
 (0)
0