8000 Issue with Messenger config and persistent messages · Issue #28885 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

Issue with Messenger config and persistent messages #28885

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
konradja100 opened this issue Oct 16, 2018 · 12 comments
Closed

Issue with Messenger config and persistent messages #28885

konradja100 opened this issue Oct 16, 2018 · 12 comments

Comments

@konradja100
Copy link
konradja100 commented Oct 16, 2018

Symfony version: 4.1.6

Hi, our company is struggling with some messenger configuration problem. We are using messenger component (symfony/amqp-pack included) andRabbitMQ, and everything is working fine, but we want to send messages as persistent - is there some way to setup this configuration by default? Our solution is simple, we wrote custom message transport, and we override sended headers in pusblish method from Connection class:

/**
     * @throws \AMQPException
     */
    public function publish(string $body, array $headers = array()): void
    {
        if ($this->debug && $this->shouldSetup()) {
            $this->setup();
        }

        $this->exchange()->publish($body, null, AMQP_NOPARAM, array('delivery_mode' => 2, 'headers' => $headers));
    }

configuration example:

# services.yaml
App\Service\Messenger\PAmqpTransport\PAmqpTransportFactory:
        tags: [messenger.transport_factory]
        bind:
            $encoder: '@App\Service\Messenger\PAmqpTransport\Serializer'
            $decoder: '@App\Service\Messenger\PAmqpTransport\Serializer'
            $debug: false

# messenger.yaml
 doctrine: 'pamqp://%env(RABBITMQ_USER)%:%env(RABBITMQ_PASS)%@%env(RABBITMQ_HOST)%:%env(RABBITMQ_PORT)%/%2f/doctrine'

We need to send 'delivery_mode' => 2, so Rabbit will keep messages persistent, shouldn't be this option possible to setup in configuration?

@javiereguiluz javiereguiluz changed the title Messenger Issue with Messenger config and persistent messages Oct 17, 2018
@nicolas-grekas
Copy link
Member

Would you like to give it a try? I suppose the delivery mode could be defined in the DSN, isn't it? Any other options?

@diegogsann
Copy link

I think that is not possible to define in dsn. @nicolas-grekas, i agree with @konradja100 when he said that this option needs to be in configuration, but is possible? can you help us in this goal?

Possibilities are:

  • define on transport layer and pass through sender
  • define in connection

if defined in transport, we will need to pass in sender send method - where today we have two arguments: body and headers. Then we need to change where $this->exchange()->publish(...) is called.

i am not certain in define in connection because delivery_mode is concerned about message only (according rabbit docs). We can use same connection to send both message types (persistent and non-persistent).

@konradja100 could you give more details about your workaround? i'm having troubles with encoder/decoder and the path to come in connection publish.

thanks.

@calin-marian
Copy link
calin-marian commented Nov 15, 2018

I had to implement something similar on a project, where we installed the rabbitmq plugin https://github.com/rabbitmq/rabbitmq-delayed-message-exchange . Fot this plugin to work, a header needs to be set on the message.

My approach was to

  1. Create a new interface:
namespace App\Messenger\Message;

/**
 * Interface HeaderMessageInterface
 *
 * @package App\Messenger\Message
 */
interface HeaderMessageInterface {

  /**
   * Return an array of headers.
   *
   * @return array
   */
  public function getHeaders(): array;
}
  1. Extend the encoder (serializer), and check if the message implements the defined interface. If it does, get the headers defined on the message, and merge them with the existing headers from the parent encoder:
namespace App\Messenger\Transport\Serialization;

use App\Messenger\Message\HeaderMessageInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;

/**
 * Class HeaderSerializer
 *
 * @package App\Messenger\Transport\Serialization
 */
class HeaderSerializer extends Serializer {

  public function encode(Envelope $envelope): array {
    $encoded = parent::encode($envelope);
    $message = $envelope->getMessage();
    if ($message instanceof HeaderMessageInterface) {
      $encoded['headers'] = $encoded['headers'] + $message->getHeaders();
    }
    return $encoded;
  }

}
services:
    ...
    ldrp.messenger.transport.header_serializer:
        class: App\Messenger\Transport\Serialization\HeaderSerializer
    ...
  1. Change the encoder service used by messenger to the one from point 2:
framework:
    messenger:
        transports:
            ...
        routing:
            ...
        encoder: ldrp.messenger.transport.header_serializer
  1. When a message wants to send a specific header, it just has to implement the interface HeaderMessageInterface:
namespace App\Messenger\Message;

/**
 * Class SendRegistrationFiles
 *
 * @package App\Messenger\Message
 */
class SendRegistrationFiles implements HeaderMessageInterface {

  public const DELAY_IN_MILISECONDS = 10000;

  /**
   * @var string
   */
  public $message;

  /**
   * SendRegistrationFiles constructor.
   *
   * @param string $message
   */
  public function __construct($message = '') {
    $this->message = $message;
  }

  public function getHeaders(): array {
    return [
      'x-delay' => static::DELAY_IN_MILISECONDS,
    ];
  }


}

This implementation could be cleaned up being adopted by the messenger component, by changing the code of Symfony\Component\Messenger\Transport\Serialization\Serializer directly, adding something on the lines of (needs some adjustments probably, code not tested):

    ...
    if ($envelope->getMessage() instanceof HeaderMessageInterface) {
      $headers = $envelope->getMessage()->getHeaders();
    }
    ...

and just adding the new interface somewhere in the Messenger namespace.

@diegogsann
Copy link
diegogsann commented Nov 15, 2018

@calin-marian i'm really really appreciate your comment. i'm very confused now. can you guys see this issue here? about headers and properties in another repo

i think that headers attributes are not making difference in message persistence.
i had added a header param in $headers variable that is passed to method exchange->connection->publish, and after restart rabbitmq the message was gone.

repair that my method becomes something like this:

...
$this->exchange()->publish($body, null, AMQP_NOPARAM, array('delivery_mode' => 2, 'headers' => $headers));
...

and not like this:

...
$headers['delivery_mode'] = 2; // merge was forced here

//$headers['delivery-mode'] = 2; //or this one

$this->exchange()->publish($body, null, AMQP_NOPARAM, $headers);
...

i don't know about delayed message. but i think that's become very useful when we don't need to overload queue at some point of time. just in case a lot of users make a request together. is this header the same as delivery_mode? (used as header, but called a message property)

anyone? thanks.

@konradja100
Copy link
Author

@zhcdsan I've created simple project showing how we solve this issue:

https://github.com/industi/Symfony-RabbitMQ-Persistent-Messages/tree/master

All you need to do is to copy files from App\Service\PAqmpTransport, override configuration in services:

    App\Service\Messenger\PAmqpTransport\PAmqpTransportFactory:
        tags: [messenger.transport_factory]
        bind:
            $encoder: '@App\Service\Messenger\PAmqpTransport\Serializer'
            $decoder: '@App\Service\Messenger\PAmqpTransport\Serializer'
            $debug: false

and change transport in config\packages\messenger.yaml to use 'pamqp' instead of 'ampq' transport.

Messages should be now send as persistent.

Hope that will help ;)

@calin-marian
Copy link

@zhcdsan You are right, I checked the publish() method of the AMQPExchange class, and ideed, the fourth parameter is $attributes, not $headers. The accepted keys are: "One of content_type, content_encoding, message_id, user_id, app_id, delivery_mode, priority, timestamp, expiration, type or reply_to, headers.". So delivery_mode is not part of headers array, it's an specific attribute. See below the signature:

    /**
     * Publish a message to an exchange.
     *
     * Publish a message to the exchange represented by the AMQPExchange object.
     *
     * @param string  $message     The message to publish.
     * @param string  $routing_key The optional routing key to which to
     *                             publish to.
     * @param integer $flags       One or more of AMQP_MANDATORY and
     *                             AMQP_IMMEDIATE.
     * @param array   $attributes  One of content_type, content_encoding,
     *                             message_id, user_id, app_id, delivery_mode,
     *                             priority, timestamp, expiration, type
     *                             or reply_to, headers.
     *
     * @throws AMQPExchangeException   On failure.
     * @throws AMQPChannelException    If the channel is not open.
     * @throws AMQPConnectionException If the connection to the broker was lost.
     *
     * @return boolean TRUE on success or FALSE on failure.
     */
    public function publish(
        $message,
        $routing_key = null,
        $flags = AMQP_NOPARAM,
        array $attributes = array()
    ) {
    }

Given that, I think my suggestion is good for adding custom headers to meesages, and could maybe be spinned off in it's own issue, but it doesn't help with persistence, where you need to set the delivery_mode attribute.

However, the meesenger componend doesn't define attributes, and is not sending any at this point. Doing a workaround for the delivery_mode would solve this issue, but wouldn't be the right way to move forward in my view. We should add the possibility to handle any attribute, not only delivery_mode, set some common sense defaults, and if anyone wants to define their own, we should provide a way to do that.
I don't know what the best way would be for that, but my personal feeling is that an interface would be a decent solution, where any message could define it's own attributes.
The downside for this is that you would have to define them for each message, even though maybe for some people defining them at exchange level would be better.
The upside is that you would be able to define them for each message :). This allows for very granular control of how each message is published.

What are your thoughts on this?

@konradja100
Copy link
Author

In my opinion only option for configuring headers globally would do the job, in our situation we don't need to exclude some messages as not persistent - if you have example of case like that you can share it with us. But as you mentioned, allowing to configure attributes even on each level (globally/excahnges/queues/messeges) would be nice, but as long as we need to implement this functionality, we need to do this with walkaround.

After all some solution should be implemented - as long as we cannot use all possibilities that rabbit gives us.

@diegogsann
Copy link

Yeahh! Totally agree. Following amqp, delivery_mode is concerned about message.

A few times ago, queue had default mode modified to 'durable'. Unless we have some protocol change, i think that is acceptable define properties in any message via interface.

If it's possible to have it in configuration, every message could be used to send messages using them - could be something like:

messenger:
    transport:
    message:
        properties: 
            - ... 

or we can use something like publishOptions for exchange... but i'm not certain about.

@Preclowski
Copy link

Maybe you should think about extending stamps somehow. It would be nice way to control attributes and headers via something like AttributeStamp or HeaderStamp. You could create AttributeStampInterface which could be handled in Transport\Serialization\Serializer::encodeStamps.

I'm still getting into MessengerComponent but now I see this is a real issue.

@meshenka
Copy link
meshenka commented Apr 4, 2019

Any updates on the best way to set delivery_mode ?

@sroze
Copy link
Contributor
sroze commented Apr 6, 2019

Proposed implementation in #30913 :)

@fabpot fabpot closed this as completed Apr 6, 2019
fabpot added a commit that referenced this issue Apr 6, 2019
…ttributes (sroze)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Uses an `AmqpStamp` to provide flags and attributes

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #28885
| License       | MIT
| Doc PR        | ø

Using the `AmqpStamp` you can configure the flags and any attribute (such as `delivery_mode`).

Commits
-------

56fa574 Uses an `AmqpStamp` to provide flags and attributes
7EAC
@tlorand84
Copy link

Is it possible that the "delivery_mode" attribute is not forwarded to the retry_strategy? Because it seems that, on a restart, i lost all the messages from the "delay" (temporary) queues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants
0