8000 PR description REMOVE ME · symfony/symfony@06da874 · GitHub
[go: up one dir, main page]

Skip to content

Commit 06da874

Browse files
committed
PR description REMOVE ME
1 parent e891d55 commit 06da874

File tree

1 file changed

+321
-0
lines changed

1 file changed

+321
-0
lines changed

pr.body.md

Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
Hello.
2+
3+
I'm happy and excited to share with you 2 new components.
4+
5+
note: The PR description (what you are currently reading) is also committed (as
6+
`pr.body.md`). I will remove it just before the merge. Like that you could also
7+
ask question about the "documentation". But please, don't over-comment the
8+
"language / English". This part of the job will be done in the doc repository.
9+
10+
### AMQP
11+
12+
It is a library created at @sensiolabs few years ago (Mon Mar 18 17:26:01 2013 +0100).
13+
Its goal is to ease the communication with a service that implement [AMQP](https://fr.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol)
14+
For example, [RabbitMQ](http://www.rabbitmq.com/) implements AMQP.
15+
16+
At that time, [Swarrot](https://github.com/swarrot/swarrot) did not exist yet
17+
and only [php-amqplib](https://github.com/php-amqplib/php-amqplib) existed.
18+
19+
We started by using php-amqplib but we faced many issues: memory leak, bad
20+
handling of signal, poor documentation
21+
22+
So we decided to stop using it and to build our own library. Over the years, we
23+
added very nice features, we fixed very weird edge case and we gain real
24+
expertise on AMQP.
25+
26+
Nowadays, it's very common to use AMQP in a web / CLI project.
27+
28+
So four years later, we decided to open-source it and to add it to Symfony to
29+
leverage the Symfony Ecosystem (code quality, release process, documentation,
30+
visibility, community, etc.)
31+
32+
So basically it's an abstraction of the [AMQP pecl](https://github.com/pdezwart/php-amqp/).
33+
34+
Here is the README.rst we had for this lib. I have updated it to match the
35+
version that will land in Symfony
36+
37+
<details>
38+
<summary>The old README (but updated)</summary>
39+
40+
Symfony AMQP
41+
============
42+
43+
Fed up of writing the same boiler-plate code over and over again whenever you
44+
need to use your favorite AMQP broker? Have you a hard time remembering how to
45+
publish a message or how to wire exchanges and queues? I had the exact same
46+
feeling. There are many AMQP libraries providing a very good low-level access to
47+
the AMQP protocol, but what about providing a simple API for abstracting the
48+
most common use cases? This library gives you an opinionated way of using any
49+
AMQP brokers and it also provides a nice and consistent API for low-level
50+
interaction with any AMQP brokers.
51+
52+
Dependencies
53+
------------
54+
55+
This library depends on the ``amqp`` PECL extensions (version 1.4.0-beta2 or
56+
later)::
57+
58+
sudo apt-get install php-amqp
59+
60+
Using the Conventions
61+
---------------------
62+
63+
The simplest usage of an AMQP broker is sending a message that is consumed by
64+
another script::
65+
66+
use Symfony\Component\Amqp\Broker;
67+
68+
// connects to a local AMQP broker by default
69+
$broker = new Broker();
70+
71+
// publish a message on the 'log' queue
72+
$broker->publish('log', 'some message');
73+
74+
// in another script (non-blocking)
75+
// $message is false if no messages are in the queue
76+
$message = $broker->get('log');
77+
78+
// blocking (waits for a message to be available in the queue)
79+
$message = $broker->consume('log');
80+
81+
The example above is based on some "conventions" and as such makes the
82+
following assumptions:
83+
84+
* A default exchange is used to publish the message (named
85+
``symfony.default``);
86+
87+
* The routing is done via the routing key (``log`` in this example);
88+
89+
* Queues and exchanges are created implicitly when first accessed;
90+
91+
* The connection to the broker is done lazily whenever a message must be sent
92+
or received.
93+
94+
Retrying a Message
95+
------------------
96+
97+
Retrying processing a message when an error occurs is as easy as defining a
98+
retry strategy on a queue::
99+
100+
use Symfony\Component\Amqp\RetryStrategy\ConstantRetryStrategy;
101+
102+
// configure the queue explicitly
103+
$broker->createQueue('log', array(
104+
// retry every 5 seconds
105+
'retry_strategy' => new ConstantRetryStrategy(5),
106+
));
107+
108+
Whenever you ``$broker->retry()`` a message, it is going to be automatically re-
109+
enqueued after a ``5`` seconds wait for a retry.
110+
111+
You can also drop the message after a limited number of retries (``2`` in the
112+
following example)::
113+
114+
$broker->createQueue('log', array(
115+
// retry 2 times
116+
'retry_strategy' => new ConstantRetryStrategy(5, 2),
117+
));
118+
119+
Instead of trying every ``n`` seconds, you can also use a retry mechanism based
120+
on a truncated exponential backoff algorithm::
121+
122+
use Symfony\Component\Amqp\RetryStrategy\ExponentialRetryStrategy;
123+
124+
$broker->createQueue('log', array(
125+
// retry 5 times
126+
'retry_strategy' => new ExponentialRetryStrategy(5),
127+
));
128+
129+
The message will be re-enqueued after 1 second the first time you call
130+
``retry()``, ``2^1`` seconds the second F438 time, ``2^2`` seconds the third time,
131+
and ``2^n`` seconds the nth time. If you want to wait more than 1 second the
132+
first time, you can pass an offset::
133+
134+
$broker->createQueue('log', array(
135+
// starts at 2^3
136+
'retry_strategy' => new ExponentialRetryStrategy(5, 3),
137+
));
138+
139+
.. note::
140+
141+
The retry strategies are implemented by using the dead-lettering feature of
142+
AMQP. Behind the scene, a special exchange is bound to queues configured
143+
based on the retry strategy you set.
144+
145+
.. note::
146+
Don't forget to ``ack`` or ``nack`` your message if you retry it. And
147+
obviously you should not use the AMQP_Requeue flag.
148+
149+
Configuring a Broker
150+
--------------------
151+
152+
By default, a broker tries to connect to a local AMQP broker with the default
153+
port, username, and password. If you have a different setting, pass a URI to
154+
the ``Broker`` constructor::
155+
156+
$broker = new Broker('amqp://user:pass@10.1.2.3:345/some-vhost');
157+
158+
Configuring an Exchange
159+
-----------------------
160+
161+
The default exchange used by the library is of type ``direct``. You can also
162+
create your own exchange::
163+
164+
// define a new fanout exchange
165+
$broker->createExchange('sensiolabs.fanout', array('type' => \AMQP_EX_TYPE_FANOUT));
166+
167+
You can then binding a queue to this named exchange easily::
168+
169+
$broker->createQueue('logs', array('exchange' => 'sensiolabs.fanout', 'routing_keys' => null));
170+
$broker->createQueue('logs.again', array('exchange' => 'sensiolabs.fanout', 'routing_keys' => null));
171+
172+
The second argument of ``createExchange()`` takes an array of arguments passed
173 10000 +
to the exchange. The following keys are used to further configure the exchange:
174+
175+
* ``flags``: Sets the exchange flags;
176+
177+
* ``type``: Sets the type of the queue (see ``\AMQP_EX_TYPE_*`` constants).
178+
179+
.. note::
180+
181+
Note that ``createExchange()`` automatically declares the exchange.
182+
183+
Configuring a Queue
184+
-------------------
185+
186+
As demonstrated in some examples, you can create your own queue. As for the
187+
exchange, the second argument of the ``createQueue()`` method is a list of
188+
queue arguments; the following keys are used to further configure the queue:
189+
190+
* ``exchange``: The exchange name to bind the queue to (the default exchange is
191+
used if not set);
192+
193+
* ``flags``: Sets the exchange flags;
194+
195+
* ``bind_arguments``: An array of arguments to pass when binding the queue with
196+
an exchange;
197+
198+
* ``retry_strategy``: The retry strategy to use (an instance of
199+
``RetryStrategyInterface``).
200+
201+
.. note::
202+
203+
Note that ``createQueue()`` automatically declares and binds the queue.
204+
205+
Implementation details
206+
----------------------
207+
208+
The retry strategy
209+
..................
210+
211+
The retry strategy is implemented with two customs and privates exchanges:
212+
``symfony.dead_letter`` and ``symfony.retry``.
213+
214+
Calling ``Broker::retry`` will publish the same message in the
215+
``symfony.dead_letter`` exchange.
216+
217+
This exchange will route the message to a queue named like
218+
``%exchange%.%time%.wait``. For example ``sensiolabs.default.000005.wait``. This
219+
queue has a TTL of 5 seconds. It means that if nothing consumes this message, it
220+
will be dropped after 5 seconds. But this queue has also a Dead Letter (DL). It
221+
means that instead of dropping the message, the AMQP server will re-publish
222+
automatically the message to the Exchange configured as DL.
223+
224+
After 5 seconds the message will be re-published to ``symfony.retry`` Exchange.
225+
This exchange is bound with every single queues. Finally, the message will land
226+
in the original queue.
227+
228+
</details>
229+
230+
### Worker
231+
232+
The second component was extracted from our internal SensioLabsAmqp component.
233+
We extracted it as is decoupled from the AMQP component. Thus it could be used,
234+
for example, to write redis, kafka daemon.
235+
236+
<details>
237+
<summary>Documentation</summary>
238+
239+
Symfony Worker
240+
==============
241+
242+
The worker component help you to write simple but flexible daemon.
243+
244+
Introduction
245+
------------
246+
247+
First you need something that ``fetch`` some messages. If the message are sent
248+
to AMQP, you should use the ``AmqpMessageFetcher``::
249+
250+
use Symfony\Component\Amqp\Broker;
251+
use Symfony\Component\Worker\MessageFetcher\AmqpMessageFetcher;
252+
253+
$broker = new Broker();
254+
$fetcher = new AmqpMessageFetcher($broker, 'queue_name');
255+
256+
Then you need a Consumer that will ``consumer`` each AMQP message::
257+
258+
namespace AppBundle\Consumer;
259+
260+
use Symfony\Component\Amqp\Broker;
261+
use Symfony\Component\Worker\Consumer\ConsumerInterface;
262+
use Symfony\Component\Worker\MessageCollection;
263+
264+
class DumpConsumer implements ConsumerInterface
265+
{
266+
private $broker;
267+
268+
public function __construct(Broker $broker)
269+
{
270+
$this->broker = $broker;
271+
}
272+
273+
public function consume(MessageCollection $messageCollection)
274+
{
275+
foreach ($messageCollection as $message) {
276+
dump($message);
277+
278+
$this->broker->ack($message);
279+
}
280+
}
281+
}
282+
283+
Finally plug everything together::
284+
285+
use AppBundle\Consumer\DumpConsumer;
286+
use Symfony\Component\Amqp\Broker;
287+
use Symfony\Component\Worker\Loop\Loop;
288+
use Symfony\Component\Worker\MessageFetcher\AmqpMessageFetcher;
289+
290+
$broker = new Broker();
291+
$fetcher = new AmqpMessageFetcher($broker, 'queue_name');
292+
$consumer = new DumpConsumer($broker);
293+
294+
$loop = new Loop(new DirectRouter($fetcher, $consumer));
295+
296+
$loop->run();
297+
298+
Message Fetcher
299+
---------------
300+
301+
* ``AmqpMessageFetcher``: Proxy to interact with an AMQP server
302+
* ``BufferedMessageFetcher``: Wrapper to buffer some message. Useful if you want to call an API in a "bulk" way.
303+
* ``InMemoryMessageFetcher``: Useful in test env
304+
305+
Router
306+
------
307+
308+
The router has the responsibility to fetch a message, then to dispatch it to a
309+
consumer.
310+
311+
* ``DirectRouter``: Use a ``MessageFetcherInterface`` and a ``ConsumerInterface``. Each message fetched is passed to the consumer.
312+
* ``RoundRobinRouter``: Wrapper to be able to fetch message from various sources.
313+
314+
</details>
315+
316+
---
317+
318+
In Symfony full stack, everything is simpler.
319+
320+
I have forked [the standard edition](https://github.com/lyrixx/symfony-standard/tree/amqp)
321+
to show how it works.

0 commit comments

Comments
 (0)
0