-
-
Notifications
You must be signed in to change notification settings - Fork 5.2k
Add a documentation about Messenger's transports #11331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b4e25a1
cf224cd
7ad2123
38f76c6
9055f2f
547971d
83b26c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,224 @@ | ||||||
.. index:: | ||||||
single: Messenger; Transports | ||||||
|
||||||
Transports | ||||||
========== | ||||||
|
||||||
To send and receive messages via message brokers, the Messenger component has | ||||||
"transports". Transports are responsible for routing messages to and from | ||||||
the message brokers. | ||||||
|
||||||
Every transport is configurable using a DSN. This DSN allows you to choose the | ||||||
transport layer as well as configure it. | ||||||
|
||||||
AMQP | ||||||
---- | ||||||
|
||||||
The most famous message broker protocol is probably AMQP, which is most | ||||||
commonly implemented with RabbitMQ. The Messenger component has built-in | ||||||
support for AMQP. | ||||||
|
||||||
How Does it Works? | ||||||
~~~~~~~~~~~~~~~~~~ | ||||||
|
||||||
A DSN that starts with ``amqp://`` is recognized and used to create | ||||||
an instance of the built-in AMQP transport:: | ||||||
|
||||||
$dsn = 'amqp://user:password@localhost/%2f/messages'; | ||||||
|
||||||
The messages will be sent to the ``messages`` exchange bound to the ``messages`` | ||||||
queue on the ``/`` vhost (the ``%2f`` is a url-encoded ``/``). | ||||||
|
||||||
.. note:: | ||||||
|
||||||
By default, RabbitMQ uses ``guest`` as a username and ``guest`` as a password | ||||||
and has a ``/`` vhost. | ||||||
|
||||||
Error Handling | ||||||
~~~~~~~~~~~~~~ | ||||||
|
||||||
If something wrong happens (i.e. an exception is thrown) while handling your message, | ||||||
the default behavior is that your message will be "NACK" and requeued. | ||||||
|
||||||
However, if your exception implements the ``RejectMessageExceptionInterface`` interface, | ||||||
the message will be rejected from the queue. | ||||||
|
||||||
Retry | ||||||
~~~~~ | ||||||
|
||||||
When receiving a message from a broker, it might happen that some exceptions will | ||||||
arise. Typically, a 3rd party provider is down or your system is under heavy load | ||||||
and can't really process some messages. To handle this scenario, there is a built-in | ||||||
retry mechanism that can be enabled via your DSN:: | ||||||
|
||||||
amqp://guest:guest@localhost/%2f/messages | ||||||
?retry[attempts]=3 | ||||||
&retry[ttl][0]=10000 | ||||||
&retry[ttl][1]=30000 | ||||||
&retry[ttl][2]=60000 | ||||||
|
||||||
In the example above, if handling the message fails, it will retry it 3 times. After | ||||||
the first failure, it will wait 10 seconds before trying it. After the second failure, | ||||||
it will wait 30 seconds. After the 3rd failure, it will wait a minute. If it still | ||||||
fails, the message will be moved to a "dead queue" and you will have to manually | ||||||
handle this message. | ||||||
|
||||||
DSN Configuration Reference | ||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||||
|
||||||
The options available in the DSN are documented on the ``Connection`` class | ||||||
in the code repository. | ||||||
|
||||||
Enqueue | ||||||
------- | ||||||
|
||||||
Probably one of the most famous PHP queue-broker libraries, Enqueue, has 10+ adapters | ||||||
with brokers like Kafka, Google Pub/Sub, AWS SQS and more. Check out the transport | ||||||
documentation in `Enqueue's official repository`_. | ||||||
|
||||||
Your own Transport | ||||||
------------------ | ||||||
< 8000 /td> |
|
|||||
If there is no available transport for your message broker, you can create your own. | ||||||
|
||||||
Your own Sender | ||||||
~~~~~~~~~~~~~~~ | ||||||
|
||||||
Using the ``SenderInterface``, you can create your own message sender. | ||||||
You already have an ``ImportantAction`` message going through the | ||||||
message bus and handled by a handler. Now, you also want to send this message as | ||||||
an email via your sender: | ||||||
|
||||||
.. code-block:: php | ||||||
|
||||||
// src/MessageSender/ImportantActionToEmailSender.php | ||||||
|
||||||
namespace App\MessageSender; | ||||||
llaakkkk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
use App\Message\ImportantAction; | ||||||
use Symfony\Component\Messenger\Transport\SenderInterface; | ||||||
|
||||||
class ImportantActionToEmailSender implements SenderInterface | ||||||
{ | ||||||
private $toEmail; | ||||||
private $mailer; | ||||||
|
||||||
public function __construct(\Swift_Mailer $mailer, string $toEmail) | ||||||
{ | ||||||
$this->mailer = $mailer; | ||||||
$this->toEmail = $toEmail; | ||||||
} | ||||||
|
||||||
public function send($message) | ||||||
{ | ||||||
if (!$message instanceof ImportantAction) { | ||||||
throw new \InvalidArgumentException(sprintf('Producer only supports "%s" messages.', ImportantAction::class)); | ||||||
} | ||||||
|
||||||
$this->mailer->send( | ||||||
(new \Swift_Message('Important action made')) | ||||||
->setTo($this->toEmail) | ||||||
->setBody( | ||||||
'<h1>Important action</h1><p>Made by '.$message->getUsername().'</p>', | ||||||
'text/html' | ||||||
) | ||||||
); | ||||||
} | ||||||
} | ||||||
|
||||||
Your own receiver | ||||||
~~~~~~~~~~~~~~~~~ | ||||||
|
||||||
A receiver is responsible for receiving messages from a source and dispatching | ||||||
them to the application. | ||||||
|
||||||
You already processed some "orders" in your application using a | ||||||
``NewOrder`` message. Now you want to integrate with a 3rd party or a legacy | ||||||
application but you can't use an API and need to use a shared CSV file with new | ||||||
orders. | ||||||
|
||||||
You will read this CSV file and dispatch a ``NewOrder`` message. All you need to | ||||||
do is to write your custom CSV receiver and Symfony will do the rest:: | ||||||
|
||||||
namespace App\MessageReceiver; | ||||||
|
||||||
use App\Message\NewOrder; | ||||||
use Symfony\Component\Messenger\Transport\ReceiverInterface; | ||||||
use Symfony\Component\Serializer\SerializerInterface; | ||||||
|
||||||
class NewOrdersFromCsvFile implements ReceiverInterface | ||||||
{ | ||||||
private $serializer; | ||||||
private $filePath; | ||||||
llaakkkk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
public function __construct(SerializerInteface $serializer, string $filePath) | ||||||
{ | ||||||
$this->serializer = $serializer; | ||||||
$this->filePath = $filePath; | ||||||
} | ||||||
|
||||||
public function receive(callable $handler) : void | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
or is the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @OskarStark there is code changes in closed PR with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @OskarStark It's ok to add a return type hint even if it isn't in the interface. I'd even say it is recommended for static analysis. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know but O am not sure how we handle this in docs in general 🤔 Ping @symfony/team-symfony-docs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wait, isn’t receive gone from the interface now? I think it’s get() |
||||||
{ | ||||||
$ordersFromCsv = $this->serializer->deserialize(file_get_contents($this->filePath), 'csv'); | ||||||
|
||||||
foreach ($ordersFromCsv as $orderFromCsv) { | ||||||
$handler(new NewOrder($orderFromCsv['id'], $orderFromCsv['account_id'], $orderFromCsv['amount'])); | ||||||
} | ||||||
} | ||||||
|
||||||
public function stop(): void | ||||||
llaakkkk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
{ | ||||||
// noop | ||||||
} | ||||||
} | ||||||
|
||||||
Create your Transport Factory | ||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||||
|
||||||
You need to give FrameworkBundle the opportunity to create your transport from a | ||||||
DSN. You will need a transport factory:: | ||||||
|
||||||
use Symfony\Component\Messenger\Transport\TransportFactoryInterface; | ||||||
use Symfony\Component\Messenger\Transport\TransportInterface; | ||||||
use Symfony\Component\Messenger\Transport\ReceiverInterface; | ||||||
use Symfony\Component\Messenger\Transport\SenderInterface; | ||||||
|
||||||
class YourTransportFactory implements TransportFactoryInterface | ||||||
{ | ||||||
public function createTransport(string $dsn, array $options): TransportInterface | ||||||
{ | ||||||
return new YourTransport(/* ... */); | ||||||
} | ||||||
|
||||||
public function supports(string $dsn, array $options): bool | ||||||
{ | ||||||
return 0 === strpos($dsn, 'my-transport://'); | ||||||
} | ||||||
} | ||||||
|
||||||
The transport object is needs to implements the ``TransportInterface`` (which simply combine | ||||||
the ``SenderInterface`` and ``ReceiverInterface``). It will look | ||||||
like this:: | ||||||
|
||||||
class YourTransport implements TransportInterface | ||||||
{ | ||||||
public function send($message) : void | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
or does it come from the interface? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The method in the public function send(Envelope $envelope): Envelope; |
||||||
{ | ||||||
// ... | ||||||
} | ||||||
|
||||||
public function receive(callable $handler) : void | ||||||
llaakkkk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
{ | ||||||
// ... | ||||||
} | ||||||
|
||||||
public function stop() : void | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
or does it come from the interface? |
||||||
{ | ||||||
// ... | ||||||
} | ||||||
} | ||||||
|
||||||
If you plan to use it within a Symfony application, you should look at | ||||||
:doc:`registering your transport factory </components/messenger>` with the FrameworkBundle. | ||||||
|
||||||
.. _`Enqueue's official repository`: https://github.com/enqueue/messenger-adapter |
Uh oh!
There was an error while loading. Please reload this page.