10000 [Messenger] Add the transports documentation by sroze · Pull Request #9756 · symfony/symfony-docs · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] Add the transports documentation #9756 8000

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
8000 Add a documentation about Messenger's transports
  • Loading branch information
sroze committed May 10, 2018
commit eb9c20494a5e135e7b682daee00a497944457d82
96 changes: 5 additions & 91 deletions components/messenger.rst
8000
Original file line number Diff line number Diff line change
Expand Up @@ -92,102 +92,16 @@ Transports
In order to send and receive messages, you will have to configure a transport. An
transport will be responsible of communicating with your message broker or 3rd parties.

Your own sender
~~~~~~~~~~~~~~~

Using the ``SenderInterface``, you can easily create your own message sender.
Let's say you already have an ``ImportantAction`` message going through the
message bus and handled by a handler. Now, you also want to send this message as
an email.

First, create your sender::

namespace App\MessageSender;

use App\Message\ImportantAction;
use Symfony\Component\Messenger\Transport\SenderInterface;

class ImportantActionToEmailSender implements SenderInterface
{
private $toEmail;
private $mailer;

public function __construct(\Swift_Mailer $mailer, string $toEmail)
{
$this->mailer = $mailer;
$this->toEmail = $toEmail;
}

public function send($message)
{
if (!$message instanceof ImportantAction) {
throw new \InvalidArgumentException(sprintf('Producer only supports "%s" messages.', ImportantAction::class));
}

$this->mailer->send(
(new \Swift_Message('Important action made'))
->setTo($this->toEmail)
->setBody(
'<h1>Important action</h1><p>Made by '.$message->getUsername().'</p>',
'text/html'
)
);
}
}

Your own receiver
~~~~~~~~~~~~~~~~~

A receiver is responsible for receiving messages from a source and dispatching
them to the application.

Let's say you already processed some "orders" in your application using a
``NewOrder`` message. Now you want to integrate with a 3rd party or a legacy
application but you can't use an API and need to use a shared CSV file with new
orders.

You will read this CSV file and dispatch a ``NewOrder`` message. All you need to
do is to write your custom CSV receiver and Symfony will do the rest.

First, create your receiver::

namespace App\MessageReceiver;

use App\Message\NewOrder;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Serializer\SerializerInterface;

class NewOrdersFromCsvFile implements ReceiverInterface
{
private $serializer;
private $filePath;

public function __construct(SerializerInteface $serializer, string $filePath)
{
$this->serializer = $serializer;
$this->filePath = $filePath;
}

public function receive(callable $handler) : void
{
$ordersFromCsv = $this->serializer->deserialize(file_get_contents($this->filePath), 'csv');

foreach ($ordersFromCsv as $orderFromCsv) {
$handler(new NewOrder($orderFromCsv['id'], $orderFromCsv['account_id'], $orderFromCsv['amount']));
}
}
.. note:

public function stop(): void
{
// noop
}
}
Check out the :doc:`Messenger transports documentation </components/messenger/transports>`.

Receiver and Sender on the same bus
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

To allow us to receive and send messages on the same bus and prevent an infinite
In order to receive and send messages on the same bus without having an infinite
loop, the message bus is equipped with the ``WrapIntoReceivedMessage`` middleware.
It will wrap the received messages into ``ReceivedMessage`` objects and the

This middleware wraps the received messages into ``ReceivedMessage`` objects and the
``SendMessageMiddleware`` middleware will know it should not route these
messages again to a transport.
222 changes: 222 additions & 0 deletions components/messenger/transports.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
.. index::
single: Messenger; Transports

Transports
==========

To send and receive messages via message brokers, the Messenger component has
some transports, to which you can route your message.

Every transport is configurable using a DSN. This DSN allows you to chose the
transport layer as well as configuring it.

AMQP
----

Very likely the most famous message broker protocol, AMQP, especially with RabbitMQ
has a built-in support within the Messenger component.

How it works?
~~~~~~~~~~~~~

The DSN protocol to use is ``amqp``. The following DSN example shows you how to
use the adapter to connect to a local RabbitMQ with the ``user`` username and
``password`` password. The messages will be sent to the ``messages`` exchange
bound to the ``messages`` queue on the ``/`` vhost::

amqp://user:password@localhost/%2f/messages

.. note:

By default, RabbitMQ uses ``guest`` as a username and ``guest`` as a password
and has a ``/`` vhost.

Error handling
~~~~~~~~~~~~~~

If something wrong happens (i.e. an exception is thrown) while handling your message,
the default behaviour is that your message will be "NACK" and requeued.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who is NACK? It sounds mean :).

Seriously, as someone not too familiar with this stuff, is NACK an "action". Like, a "message is NACKed"? OR, something different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not Acknowledge. Do you have any idea on how to phrase it differently? 🤷‍♂️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beginning with this, i'll explain this like : Not Acknowledge = not treated and requeued.


However, if your exception implements the ``RejectMessageExceptionInterface`` interface,
the message will be rejected from the queue.

Retry
~~~~~

When receiving messages from a broker, it might happen that some exceptions will
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear what this means. Do you mean, when my app is handling a message (that just came from a broker), it could fail (e.g. DB exception, or my handler throws an exception)? Or, are you talking about some sort of communication error from the broker?

arise. Typically, a 3rd party provider is down or your system is under heavy load
and can't really process some messages. To handle this scenario, there is a built-in
retry mechanism that can be enabled via your DSN::
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't use the :: in these cases - it's short for .. code-block:: php, and it's just text below. Let's use:

.. code-block:: text


amqp://guest:guest@localhost/%2f/messages
?retry[attempts]=3
&retry[ttl][0]=10000
&retry[ttl][1]=30000
&retry[ttl][2]=60000

In the example above, if handling the message fails, it will retry it 3 times. After
the first failure, it will wait 10 seconds before trying it. After the 2nd failure,
it will wait 30 seconds. After the 3rd failure, it will wait a minute. If it still
fails, the message will be moved to a "dead queue" and you will have to manually
handle this message.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just define a single TTL to be used for each retry? Or is there a default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are to be removed as retry wasn't merged in.


DSN configuration reference
~~~~~~~~~~~~~~~~~~~~~~~~~~~

The options available to in the DSN are documented on the ``Connection`` class
in the code repository.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Enqueue
-------

Probably one of the most famous PHP queue-broker libraries, Enqueue has 10+ adapters
with brokers like Kafka, Google Pub/Sub, AWS SQS and more. Check out the transport
documentation in `Enqueue's official repository`_.

Your own transport
------------------

If there is no available transport for your message broker, you can easily
create your own.

Your own sender
~~~~~~~~~~~~~~~

Using the ``SenderInterface``, you can easily create your own message sender.
Let's say you already have an ``ImportantAction`` message going through the
message bus and handled by a handler. Now, you also want to send this message as
an email.

First, create your sender::

namespace App\MessageSender;
Copy link
Member
@yceruto yceruto May 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could go suggesting a better structure for Messenger stuff? maybe App\Message\Sender or App\Messenger\Transport instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, why not 👍


use App\Message\ImportantAction;
use Symfony\Component\Messenger\Transport\SenderInterface;

class ImportantActionToEmailSender implements SenderInterface
{
private $toEmail;
private $mailer;

public function __construct(\Swift_Mailer $mailer, string $toEmail)
{
$this->mailer = $mailer;
$this->toEmail = $toEmail;
}

public function send($message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be updated according to the current contract, Envelope $... ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #9757

{
if (!$message instanceof ImportantAction) {
throw new \InvalidArgumentException(sprintf('Producer only supports "%s" messages.', ImportantAction::class));
}

$this->mailer->send(
(new \Swift_Message('Important action made'))
->setTo($this->toEmail)
->setBody(
'<h1>Important action</h1><p>Made by '.$message->getUsername().'</p>',
'text/html'
)
);
}
}

Your own receiver
~~~~~~~~~~~~~~~~~

A receiver is responsible for receiving messages from a source and dispatching
them to the application.

Let's say you already processed some "orders" in your application using a
``NewOrder`` message. Now you want to integrate with a 3rd party or a legacy
application but you can't use an API and need to use a shared CSV file with new
orders.

You will read this CSV file and dispatch a ``NewOrder`` message. All you need to
do is to write your custom CSV receiver and Symfony will do the rest.

First, create your receiver::

namespace App\MessageReceiver;

use App\Message\NewOrder;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Serializer\SerializerInterface;

class NewOrdersFromCsvFile implements ReceiverInterface
{
private $serializer;
private $filePath;

public function __construct(SerializerInteface $serializer, string $filePath)
{
$this->serializer = $serializer;
$this->filePath = $filePath;
}

public function receive(callable $handler) : void
{
$ordersFromCsv = $this->serializer->deserialize(file_get_contents($this->filePath), 'csv');

foreach ($ordersFromCsv as $orderFromCsv) {
$handler(new NewOrder($orderFromCsv['id'], $orderFromCsv['account_id'], $orderFromCsv['amount']));
}
}

public function stop(): void
{
// noop
}
}

Create your Transport Factory
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

You need to give FrameworkBundle the opportunity to create your transport from a
DSN. You will need an transport factory::

use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;

class YourTransportFactory implements TransportFactoryInterface
{
public function createTransport(string $dsn, array $options): TransportInterface
{
return new YourTransport(/* ... */);
}

public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'my-transport://');
}
}

The transport object is needs to implements the ``TransportInterface`` (which simply combine
the ``SenderInterface`` and ``ReceiverInterface``). It will look
like this::

class YourTransport implements TransportInterface
{
public function send($message) : void
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

{
// ...
}

public function receive(callable $handler) : void
{
// ...
}

public function stop() : void
{
// ...
}
}

If you plan to use it within a Symfony application, you should look at
:doc:`registering your transport factory </components/messenger>` with the FrameworkBundle.

.. _`Enqueue's official repository`: https://github.com/enqueue/messenger-adapter
46 changes: 0 additions & 46 deletions messenger.rst
Original file line number Diff line number Diff line change
Expand Up @@ -279,52 +279,6 @@ Your own Transport
Once you have written your transport's sender and receiver, you can register your
transport factory to be able to use it via a DSN in the Symfony application.

Create your Transport Factory
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

You need to give FrameworkBundle the opportunity to create your transport from a
DSN. You will need an transport factory::

use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;

class YourTransportFactory implements TransportFactoryInterface
{
public function createTransport(string $dsn, array $options): TransportInterface
{
return new YourTransport(/* ... */);
}

public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'my-transport://');
}
}

The transport object is needs to implements the ``TransportInterface`` (which simply combine
the ``SenderInterface`` and ``ReceiverInterface``). It will look
like this::

class YourTransport implements TransportInterface
{
public function send($message) : void
{
// ...
}

public function receive(callable $handler) : void
{
// ...
}

public function stop() : void
{
// ...
}
}

Register your factory
~~~~~~~~~~~~~~~~~~~~~

Expand Down
0