|
| 1 | + |
| 2 | + |
| 3 | +Learning RabbitMQ, part 3 (Broadcast) |
| 4 | +===================================== |
| 5 | + |
| 6 | + |
| 7 | +<center><div class="dot_bitmap"> |
| 8 | +<img src="http://github.com/rabbitmq/rabbitmq-tutorials/raw/master/_img/ef3c2a34713ecb41b5cc2623b0c2fc34.png" alt="Dot graph" width="491" height="125" /> |
| 9 | +</div></center> |
| 10 | + |
| 11 | + |
| 12 | +In previous part of this tutorial we've learned how to create a task |
| 13 | +queue. The idea behind a task queue is that a task should be delivered |
| 14 | +to exactly one worker. In this part we'll do something completely |
| 15 | +different - we'll try to deliver a message to multiple consumers. This |
| 16 | +pattern is known as "publish-subscribe". |
| 17 | + |
| 18 | +To illustrate this this tutorial, we're going to build a simple |
| 19 | +logging system. It will consist of two programs - one will emit log |
| 20 | +messages and one will receive them. |
| 21 | + |
| 22 | +In our logging system we'll every running copy of the receiver program |
| 23 | +will be able to get the same messages. That way we'll be able to run one |
| 24 | +receiver and direct the logs to disk. In the same time we'll be able to run |
| 25 | +another reciver and see the same logs on the screen. |
| 26 | + |
| 27 | +Essentially, crated log messages are going to be broadcasted to all |
| 28 | +the receivers. |
| 29 | + |
| 30 | + |
| 31 | +Exchanges |
| 32 | +--------- |
| 33 | + |
| 34 | +In previous parts of the tutorial we've understood how to send and |
| 35 | +receive messages. Now it's time to introduce the full messaging model |
| 36 | +in Rabbit. |
| 37 | + |
| 38 | +Let's quickly remind what we've learned: |
| 39 | + |
| 40 | + * _Producer_ is a name for user application that sends messages. |
| 41 | + * _Queue_ is a buffer that stores messages. |
| 42 | + * _Consumer_ is a name for user application that receives messages. |
| 43 | + |
| 44 | + |
| 45 | +The core idea behind the messaging model in Rabbit is that the |
| 46 | +producer never sends any messages directly to the queue. Actually, |
| 47 | +quite often the producer doesn't even know that a message won't be |
| 48 | +delivered to any queue! |
| 49 | +
A3E2
|
| 50 | +Instead, the producer can only send messages to an _exchange_. An |
| 51 | +exchange is a very simple thing. On one side it receives messages from |
| 52 | +producers and the other side it pushes them to queues. The exchange |
| 53 | +must know exactly what to do with a received message. Should it be |
| 54 | +appended to a particular queue? Should it be appended to many queues? |
| 55 | +Or should it get silently discarded. The exact rules for that are |
| 56 | +defined by the _exchange type_. |
| 57 | + |
| 58 | + |
| 59 | +<center><div class="dot_bitmap"> |
| 60 | +<img src="http://github.com/rabbitmq/rabbitmq-tutorials/raw/master/_img/d8e72dacc8896f457fc9abb0d701ce84.png" alt="Dot graph" width="376" height="125" /> |
| 61 | +</div></center> |
| 62 | + |
| 63 | + |
| 64 | + |
| 65 | +There are a few exchange types available: `direct`, `topic`, |
| 66 | +`headers` and `fanout`. We'll focus on the last one - the |
| 67 | +fanout. Let's create an exchange of that type, and name it `logs`: |
| 68 | + |
| 69 | +<div><pre><code class='python'>channel.exchange_declare(exchange='logs', |
| 70 | + type='fanout')</code></pre></div> |
| 71 | + |
| 72 | + |
| 73 | +The fanout exchange is very simple. As you can probably guess from the |
| 74 | +name, it just broadcasts all the messages it receives to all the |
| 75 | +queues it knows. And that's exactly what we need for our logger. |
| 76 | + |
| 77 | + |
| 78 | +> #### Listing exchanges |
| 79 | +> |
| 80 | +> To list the exchanges on the server you can once again use the |
| 81 | +> Swiss Army Knife - `rabbitmqctl`: |
| 82 | +> |
| 83 | +> $ sudo rabbitmqctl list_exchanges |
| 84 | +> Listing exchanges ... |
| 85 | +> logs fanout |
| 86 | +> amq.direct direct |
| 87 | +> amq.topic topic |
| 88 | +> amq.fanout fanout |
| 89 | +> amq.headers headers |
| 90 | +> ...done. |
| 91 | +> |
| 92 | +> You can see a few `amq.` exchanges. They're created by default, but with a |
| 93 | +> bit of good luck you'll never need to use them. |
| 94 | +
|
| 95 | +<div></div> |
| 96 | + |
| 97 | +> #### Nameless exchange |
| 98 | +> |
| 99 | +> In previous parts of the tutorial we knew nothing about exchanges, |
| 100 | +> but still were able to send messages to queues. That was possible |
| 101 | +> because we were using a default `""` _empty string_ (nameless) exchange. |
| 102 | +> Remember how publishing worked: |
| 103 | +> |
| 104 | +> chnnel.basic_publish(exchange='', |
| 105 | +> routing_key='test', |
| 106 | +> body=message) |
| 107 | +> |
| 108 | +> The _empty string_ exchange is a special exchange: every queue is connected |
| 109 | +> to it using its queue name as a key. When you publish a message to the |
| 110 | +> nameless exchange it will be routed to the queue with name specified |
| 111 | +> by `routing_key`. |
| 112 | +
|
| 113 | + |
| 114 | + |
| 115 | +Temporary queues |
| 116 | +---------------- |
| 117 | + |
| 118 | +In previous tutorial parts we were using a queue which had a name - |
| 119 | +`test` in first `test_dur` in second tutorial. Being able to name a |
| 120 | +queue was crucial for us - we needed to point the workers to the same |
| 121 | +queue. Essentially, giving a queue a name is important when you don't |
| 122 | +want to loose any messages if the consumer disconnects. |
| 123 | + |
| 124 | +But that's not true for our logger. We do want to hear only about |
| 125 | +currently flowing log messages, we do not want to hear the old |
| 126 | +ones. To solve that problem we need two things. |
| 127 | + |
| 128 | +First, whenever we connect the queue should be new and empty. To do it |
| 129 | +we could just use random queue name, or, even better - let server to |
| 130 | +choose a random unique queue name. We can do it by not supplying the |
| 131 | +`queue` parameter to `queue_declare`: |
| 132 | + |
| 133 | +<div><pre><code class='python'>result = channel.queue_declare()</code></pre></div> |
| 134 | + |
| 135 | + |
| 136 | +At that point `result.queue` contains a random queue name. For example it |
| 137 | +may look like `amq.gen-U0srCoW8TsaXjNh73pnVAw==`. |
| 138 | + |
| 139 | +Secondly, once we disconnect the client the queue should be |
| 140 | +deleted. There's an `auto_delete` flag for that: |
| 141 | + |
| 142 | +<div><pre><code class='python'>result = channel.queue_declare(auto_delete=True)</code></pre></div> |
| 143 | + |
| 144 | + |
| 145 | + |
| 146 | +Bindings |
| 147 | +-------- |
| 148 | + |
| 149 | + |
| 150 | +<center><div class="dot_bitmap"> |
| 151 | +<img src="http://github.com/rabbitmq/rabbitmq-tutorials/raw/master/_img/45955c6ea99bae76fcb360a284cbb53b.png" alt="Dot graph" width="339" height="96" /> |
| 152 | +</div></center> |
| 153 | + |
| 154 | + |
| 155 | + |
| 156 | +We've already created a fanout exchange and a queue. Now we need to |
| 157 | +tell the exchange to send messages to our queue. That relationship, |
| 158 | +between exchange and a queue is called a _binding_. |
| 159 | + |
| 160 | +<div><pre><code class='python'>channel.queue_bind(exchange='logs', |
| 161 | + queue=result.queue)</code></pre></div> |
| 162 | + |
| 163 | + |
| 164 | +From now on the `logs` exchange will broadcast the messages also to |
| 165 | +our queue. |
| 166 | + |
| 167 | +> #### Listing bindings |
| 168 | +> |
| 169 | +> You can list existing bindings using, you guessed it, |
| 170 | +> `rabbitmqctl list_bindings` command. |
| 171 | +
|
| 172 | + |
| 173 | +Putting it all together |
| 174 | +----------------------- |
| 175 | + |
| 176 | + |
| 177 | +<center><div class="dot_bitmap"> |
| 178 | +<img src="http://github.com/rabbitmq/rabbitmq-tutorials/raw/master/_img/b78d29ae746c8faafb655eb616173f68.png" alt="Dot graph" width="347" height="173" /> |
| 179 | +</div></center> |
| 180 | + |
| 181 | + |
| 182 | +The producer program, which emits log messages, doesn't look much |
| 183 | +different than in previous tutorial. The most important change is |
| 184 | +that, we now need to publish messages to `logs` exchange instead of |
| 185 | +the nameless one. We need to supply the `routing_key` parameter, but |
| 186 | +it's value is ignored for `fanout` exchanges. Here goes the code for |
| 187 | +`emit_log.py` script: |
| 188 | + |
| 189 | +<div><pre><code class='python'>#!/usr/bin/env python |
| 190 | +import pika |
| 191 | +import sys |
| 192 | + |
| 193 | +connection = pika.AsyncoreConnection(pika.ConnectionParameters( |
| 194 | + host='127.0.0.1', |
| 195 | + credentials=pika.PlainCredentials('guest', 'guest'))) |
| 196 | +channel = connection.channel() |
| 197 | + |
| 198 | +message = ' '.join(sys.argv[1:]) or "info: Hello World!" |
| 199 | +channel.basic_publish(exchange='logs', |
| 200 | + routing_key='', |
| 201 | + body=message) |
| 202 | +print " [x] Sent %r" % (message,)</code></pre></div> |
| 203 | + |
| 204 | + |
| 205 | +As you see, we avoided declaring exchange. If the `logs` exchange |
| 206 | +isn't created at the time this code is executed the message will be |
| 207 | +lost. That's okay for us. |
| 208 | + |
| 209 | +The code for `receive_logs.py`: |
| 210 | + |
| 211 | +<div><pre><code class='python'>#!/usr/bin/env python |
| 212 | +import pika |
| 213 | + |
| 214 | +connection = pika.AsyncoreConnection(pika.ConnectionParameters( |
| 215 | + host='127.0.0.1', |
| 216 | + credentials=pika.PlainCredentials('guest', 'guest'))) |
| 217 | +channel = connection.channel() |
| 218 | + |
| 219 | +channel.exchange_declare(exchange='logs', |
| 220 | + type='fanout') |
| 221 | + |
| 222 | +result = channel.queue_declare(auto_delete=True) |
| 223 | +queue_name = result.queue |
| 224 | + |
| 225 | +channel.queue_bind(exchange='logs', |
| 226 | + queue=queue_name) |
| 227 | + |
| 228 | +print ' [*] Waiting for logs. To exit press CTRL+C' |
| 229 | + |
| 230 | +def callback(ch, method, header, body): |
| 231 | + print " [x] %r" % (body,) |
| 232 | + |
| 233 | +channel.basic_consume(callback, |
| 234 | + queue=queue_name, |
| 235 | + no_ack=True) |
| 236 | + |
| 237 | +pika.asyncore_loop()</code></pre></div> |
| 238 | + |
| 239 | + |
| 240 | + |
| 241 | +We're done. If you want to save logs to a file, just open a console and type: |
| 242 | + |
| 243 | + $ ./receive_logs.py > logs_from_rabbit.log |
| 244 | + |
| 245 | +If you wish to see the logs on your screen, spawn a new terminal and run: |
| 246 | + |
| 247 | + $ ./receive_logs.py |
| 248 | + |
| 249 | + |
| 250 | + |
0 commit comments