|
| 1 | +Hello. |
| 2 | + |
| 3 | +I'm happy and excited to share with you 2 new components. |
| 4 | + |
| 5 | +note: The PR description (what you are currently reading) is also committed (as |
| 6 | +`pr.body.md`). I will remove it just before the merge. Like that you could also |
| 7 | +ask question about the "documentation". But please, don't over-comment the |
| 8 | +"language / English". This part of the job will be done in the doc repository. |
| 9 | + |
| 10 | +### AMQP |
| 11 | + |
| 12 | +It is a library created at @sensiolabs few years ago (Mon Mar 18 17:26:01 2013 +0100). |
| 13 | +Its goal is to ease the communication with a service that implement [AMQP](https://fr.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol) |
| 14 | +For example, [RabbitMQ](http://www.rabbitmq.com/) implements AMQP. |
| 15 | + |
| 16 | +At that time, [Swarrot](https://github.com/swarrot/swarrot) did not exist yet |
| 17 | +and only [php-amqplib](https://github.com/php-amqplib/php-amqplib) existed. |
| 18 | + |
| 19 | +We started by using php-amqplib but we faced many issues: memory leak, bad |
| 20 | +handling of signal, poor documentation |
| 21 | + |
| 22 | +So we decided to stop using it and to build our own library. Over the years, we |
| 23 | +added very nice features, we fixed very weird edge case and we gain real |
| 24 | +expertise on AMQP. |
| 25 | + |
| 26 | +Nowadays, it's very common to use AMQP in a web / CLI project. |
| 27 | + |
| 28 | +So four years later, we decided to open-source it and to add it to Symfony to |
| 29 | +leverage the Symfony Ecosystem (code quality, release process, documentation, |
| 30 | +visibility, community, etc.) |
| 31 | + |
| 32 | +So basically it's an abstraction of the [AMQP pecl](https://github.com/pdezwart/php-amqp/). |
| 33 | + |
| 34 | +Here is the README.rst we had for this lib. I have updated it to match the |
| 35 | +version that will land in Symfony |
| 36 | + |
| 37 | +<details> |
| 38 | +<summary>The old README (but updated)</summary> |
| 39 | + |
| 40 | +Symfony AMQP |
| 41 | +============ |
| 42 | + |
| 43 | +Fed up of writing the same boiler-plate code over and over again whenever you |
| 44 | +need to use your favorite AMQP broker? Have you a hard time remembering how to |
| 45 | +publish a message or how to wire exchanges and queues? I had the exact same |
| 46 | +feeling. There are many AMQP libraries providing a very good low-level access to |
| 47 | +the AMQP protocol, but what about providing a simple API for abstracting the |
| 48 | +most common use cases? This library gives you an opinionated way of using any |
| 49 | +AMQP brokers and it also provides a nice and consistent API for low-level |
| 50 | +interaction with any AMQP brokers. |
| 51 | + |
| 52 | +Dependencies |
| 53 | +------------ |
| 54 | + |
| 55 | +This library depends on the ``amqp`` PECL extensions (version 1.4.0-beta2 or |
| 56 | +later):: |
| 57 | + |
| 58 | + sudo apt-get install php-amqp |
| 59 | + |
| 60 | +Using the Conventions |
| 61 | +--------------------- |
| 62 | + |
| 63 | +The simplest usage of an AMQP broker is sending a message that is consumed by |
| 64 | +another script:: |
| 65 | + |
| 66 | + use Symfony\Component\Amqp\Broker; |
| 67 | + |
| 68 | + // connects to a local AMQP broker by default |
| 69 | + $broker = new Broker(); |
| 70 | + |
| 71 | + // publish a message on the 'log' queue |
| 72 | + $broker->publish('log', 'some message'); |
| 73 | + |
| 74 | + // in another script (non-blocking) |
| 75 | + // $message is false if no messages are in the queue |
| 76 | + $message = $broker->get('log'); |
| 77 | + |
| 78 | + // blocking (waits for a message to be available in the queue) |
| 79 | + $message = $broker->consume('log'); |
| 80 | + |
| 81 | +The example above is based on some "conventions" and as such makes the |
| 82 | +following assumptions: |
| 83 | + |
| 84 | +* A default exchange is used to publish the message (named |
| 85 | + ``symfony.default``); |
| 86 | + |
| 87 | +* The routing is done via the routing key (``log`` in this example); |
| 88 | + |
| 89 | +* Queues and exchanges are created implicitly when first accessed; |
| 90 | + |
| 91 | +* The connection to the broker is done lazily whenever a message must be sent |
| 92 | + or received. |
| 93 | + |
| 94 | +Retrying a Message |
| 95 | +------------------ |
| 96 | + |
| 97 | +Retrying processing a message when an error occurs is as easy as defining a |
| 98 | +retry strategy on a queue:: |
| 99 | + |
| 100 | + use Symfony\Component\Amqp\RetryStrategy\ConstantRetryStrategy; |
| 101 | + |
| 102 | + // configure the queue explicitly |
| 103 | + $broker->createQueue('log', array( |
| 104 | + // retry every 5 seconds |
| 105 | + 'retry_strategy' => new ConstantRetryStrategy(5), |
| 106 | + )); |
| 107 | + |
| 108 | +Whenever you ``$broker->retry()`` a message, it is going to be automatically re- |
| 109 | +enqueued after a ``5`` seconds wait for a retry. |
| 110 | + |
| 111 | +You can also drop the message after a limited number of retries (``2`` in the |
| 112 | +following example):: |
| 113 | + |
| 114 | + $broker->createQueue('log', array( |
| 115 | + // retry 2 times |
| 116 | + 'retry_strategy' => new ConstantRetryStrategy(5, 2), |
| 117 | + )); |
| 118 | + |
| 119 | +Instead of trying every ``n`` seconds, you can also use a retry mechanism based |
| 120 | +on a truncated exponential backoff algorithm:: |
| 121 | + |
| 122 | + use Symfony\Component\Amqp\RetryStrategy\ExponentialRetryStrategy; |
| 123 | + |
| 124 | + $broker->createQueue('log', array( |
| 125 | + // retry 5 times |
| 126 | + 'retry_strategy' => new ExponentialRetryStrategy(5), |
| 127 | + )); |
| 128 | + |
| 129 | +The message will be re-enqueued after 1 second the first time you call |
| 130 | +``retry()``, ``2^1`` seconds the second
F438
time, ``2^2`` seconds the third time, |
| 131 | +and ``2^n`` seconds the nth time. If you want to wait more than 1 second the |
| 132 | +first time, you can pass an offset:: |
| 133 | + |
| 134 | + $broker->createQueue('log', array( |
| 135 | + // starts at 2^3 |
| 136 | + 'retry_strategy' => new ExponentialRetryStrategy(5, 3), |
| 137 | + )); |
| 138 | + |
| 139 | +.. note:: |
| 140 | + |
| 141 | + The retry strategies are implemented by using the dead-lettering feature of |
| 142 | + AMQP. Behind the scene, a special exchange is bound to queues configured |
| 143 | + based on the retry strategy you set. |
| 144 | + |
| 145 | +.. note:: |
| 146 | + Don't forget to ``ack`` or ``nack`` your message if you retry it. And |
| 147 | + obviously you should not use the AMQP_Requeue flag. |
| 148 | + |
| 149 | +Configuring a Broker |
| 150 | +-------------------- |
| 151 | + |
| 152 | +By default, a broker tries to connect to a local AMQP broker with the default |
| 153 | +port, username, and password. If you have a different setting, pass a URI to |
| 154 | +the ``Broker`` constructor:: |
| 155 | + |
| 156 | + $broker = new Broker('amqp://user:pass@10.1.2.3:345/some-vhost'); |
| 157 | + |
| 158 | +Configuring an Exchange |
| 159 | +----------------------- |
| 160 | + |
| 161 | +The default exchange used by the library is of type ``direct``. You can also |
| 162 | +create your own exchange:: |
| 163 | + |
| 164 | + // define a new fanout exchange |
| 165 | + $broker->createExchange('sensiolabs.fanout', array('type' => \AMQP_EX_TYPE_FANOUT)); |
| 166 | + |
| 167 | +You can then binding a queue to this named exchange easily:: |
| 168 | + |
| 169 | + $broker->createQueue('logs', array('exchange' => 'sensiolabs.fanout', 'routing_keys' => null)); |
| 170 | + $broker->createQueue('logs.again', array('exchange' => 'sensiolabs.fanout', 'routing_keys' => null)); |
| 171 | + |
| 172 | +The second argument of ``createExchange()`` takes an array of arguments passed |
| 173 |
10000
+to the exchange. The following keys are used to further configure the exchange: |
| 174 | + |
| 175 | +* ``flags``: Sets the exchange flags; |
| 176 | + |
| 177 | +* ``type``: Sets the type of the queue (see ``\AMQP_EX_TYPE_*`` constants). |
| 178 | + |
| 179 | +.. note:: |
| 180 | + |
| 181 | + Note that ``createExchange()`` automatically declares the exchange. |
| 182 | + |
| 183 | +Configuring a Queue |
| 184 | +------------------- |
| 185 | + |
| 186 | +As demonstrated in some examples, you can create your own queue. As for the |
| 187 | +exchange, the second argument of the ``createQueue()`` method is a list of |
| 188 | +queue arguments; the following keys are used to further configure the queue: |
| 189 | + |
| 190 | +* ``exchange``: The exchange name to bind the queue to (the default exchange is |
| 191 | + used if not set); |
| 192 | + |
| 193 | +* ``flags``: Sets the exchange flags; |
| 194 | + |
| 195 | +* ``bind_arguments``: An array of arguments to pass when binding the queue with |
| 196 | + an exchange; |
| 197 | + |
| 198 | +* ``retry_strategy``: The retry strategy to use (an instance of |
| 199 | + ``RetryStrategyInterface``). |
| 200 | + |
| 201 | +.. note:: |
| 202 | + |
| 203 | + Note that ``createQueue()`` automatically declares and binds the queue. |
| 204 | + |
| 205 | +Implementation details |
| 206 | +---------------------- |
| 207 | + |
| 208 | +The retry strategy |
| 209 | +.................. |
| 210 | + |
| 211 | +The retry strategy is implemented with two customs and privates exchanges: |
| 212 | +``symfony.dead_letter`` and ``symfony.retry``. |
| 213 | + |
| 214 | +Calling ``Broker::retry`` will publish the same message in the |
| 215 | +``symfony.dead_letter`` exchange. |
| 216 | + |
| 217 | +This exchange will route the message to a queue named like |
| 218 | +``%exchange%.%time%.wait``. For example ``sensiolabs.default.000005.wait``. This |
| 219 | +queue has a TTL of 5 seconds. It means that if nothing consumes this message, it |
| 220 | +will be dropped after 5 seconds. But this queue has also a Dead Letter (DL). It |
| 221 | +means that instead of dropping the message, the AMQP server will re-publish |
| 222 | +automatically the message to the Exchange configured as DL. |
| 223 | + |
| 224 | +After 5 seconds the message will be re-published to ``symfony.retry`` Exchange. |
| 225 | +This exchange is bound with every single queues. Finally, the message will land |
| 226 | +in the original queue. |
| 227 | + |
| 228 | +</details> |
| 229 | + |
| 230 | +### Worker |
| 231 | + |
| 232 | +The second component was extracted from our internal SensioLabsAmqp component. |
| 233 | +We extracted it as is decoupled from the AMQP component. Thus it could be used, |
| 234 | +for example, to write redis, kafka daemon. |
| 235 | + |
| 236 | +<details> |
| 237 | +<summary>Documentation</summary> |
| 238 | + |
| 239 | +Symfony Worker |
| 240 | +============== |
| 241 | + |
| 242 | +The worker component help you to write simple but flexible daemon. |
| 243 | + |
| 244 | +Introduction |
| 245 | +------------ |
| 246 | + |
| 247 | +First you need something that ``fetch`` some messages. If the message are sent |
| 248 | +to AMQP, you should use the ``AmqpMessageFetcher``:: |
| 249 | + |
| 250 | + use Symfony\Component\Amqp\Broker; |
| 251 | + use Symfony\Component\Worker\MessageFetcher\AmqpMessageFetcher; |
| 252 | + |
| 253 | + $broker = new Broker(); |
| 254 | + $fetcher = new AmqpMessageFetcher($broker, 'queue_name'); |
| 255 | + |
| 256 | +Then you need a Consumer that will ``consumer`` each AMQP message:: |
| 257 | + |
| 258 | + namespace AppBundle\Consumer; |
| 259 | + |
| 260 | + use Symfony\Component\Amqp\Broker; |
| 261 | + use Symfony\Component\Worker\Consumer\ConsumerInterface; |
| 262 | + use Symfony\Component\Worker\MessageCollection; |
| 263 | + |
| 264 | + class DumpConsumer implements ConsumerInterface |
| 265 | + { |
| 266 | + private $broker; |
| 267 | + |
| 268 | + public function __construct(Broker $broker) |
| 269 | + { |
| 270 | + $this->broker = $broker; |
| 271 | + } |
| 272 | + |
| 273 | + public function consume(MessageCollection $messageCollection) |
| 274 | + { |
| 275 | + foreach ($messageCollection as $message) { |
| 276 | + dump($message); |
| 277 | + |
| 278 | + $this->broker->ack($message); |
| 279 | + } |
| 280 | + } |
| 281 | + } |
| 282 | + |
| 283 | +Finally plug everything together:: |
| 284 | + |
| 285 | + use AppBundle\Consumer\DumpConsumer; |
| 286 | + use Symfony\Component\Amqp\Broker; |
| 287 | + use Symfony\Component\Worker\Loop\Loop; |
| 288 | + use Symfony\Component\Worker\MessageFetcher\AmqpMessageFetcher; |
| 289 | + |
| 290 | + $broker = new Broker(); |
| 291 | + $fetcher = new AmqpMessageFetcher($broker, 'queue_name'); |
| 292 | + $consumer = new DumpConsumer($broker); |
| 293 | + |
| 294 | + $loop = new Loop(new DirectRouter($fetcher, $consumer)); |
| 295 | + |
| 296 | + $loop->run(); |
| 297 | + |
| 298 | +Message Fetcher |
| 299 | +--------------- |
| 300 | + |
| 301 | +* ``AmqpMessageFetcher``: Proxy to interact with an AMQP server |
| 302 | +* ``BufferedMessageFetcher``: Wrapper to buffer some message. Useful if you want to call an API in a "bulk" way. |
| 303 | +* ``InMemoryMessageFetcher``: Useful in test env |
| 304 | + |
| 305 | +Router |
| 306 | +------ |
| 307 | + |
| 308 | +The router has the responsibility to fetch a message, then to dispatch it to a |
| 309 | +consumer. |
| 310 | + |
| 311 | +* ``DirectRouter``: Use a ``MessageFetcherInterface`` and a ``ConsumerInterface``. Each message fetched is passed to the consumer. |
| 312 | +* ``RoundRobinRouter``: Wrapper to be able to fetch message from various sources. |
| 313 | + |
| 314 | +</details> |
| 315 | + |
| 316 | +--- |
| 317 | + |
| 318 | +In Symfony full stack, everything is simpler. |
| 319 | + |
| 320 | +I have forked [the standard edition](https://github.com/lyrixx/symfony-standard/tree/amqp) |
| 321 | +to show how it works. |
0 commit comments