-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
Issue with Messenger config and persistent messages #28885
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Would you like to give it a try? I suppose the delivery mode could be defined in the DSN, isn't it? Any other options? |
I think that is not possible to define in dsn. @nicolas-grekas, i agree with @konradja100 when he said that this option needs to be in configuration, but is possible? can you help us in this goal? Possibilities are:
if defined in transport, we will need to pass in sender send method - where today we have two arguments: body and headers. Then we need to change where $this->exchange()->publish(...) is called. i am not certain in define in connection because delivery_mode is concerned about message only (according rabbit docs). We can use same connection to send both message types (persistent and non-persistent). @konradja100 could you give more details about your workaround? i'm having troubles with encoder/decoder and the path to come in connection publish. thanks. |
I had to implement something similar on a project, where we installed the rabbitmq plugin https://github.com/rabbitmq/rabbitmq-delayed-message-exchange . Fot this plugin to work, a header needs to be set on the message. My approach was to
namespace App\Messenger\Message;
/**
* Interface HeaderMessageInterface
*
* @package App\Messenger\Message
*/
interface HeaderMessageInterface {
/**
* Return an array of headers.
*
* @return array
*/
public function getHeaders(): array;
}
namespace App\Messenger\Transport\Serialization;
use App\Messenger\Message\HeaderMessageInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
/**
* Class HeaderSerializer
*
* @package App\Messenger\Transport\Serialization
*/
class HeaderSerializer extends Serializer {
public function encode(Envelope $envelope): array {
$encoded = parent::encode($envelope);
$message = $envelope->getMessage();
if ($message instanceof HeaderMessageInterface) {
$encoded['headers'] = $encoded['headers'] + $message->getHeaders();
}
return $encoded;
}
} services:
...
ldrp.messenger.transport.header_serializer:
class: App\Messenger\Transport\Serialization\HeaderSerializer
...
framework:
messenger:
transports:
...
routing:
...
encoder: ldrp.messenger.transport.header_serializer
namespace App\Messenger\Message;
/**
* Class SendRegistrationFiles
*
* @package App\Messenger\Message
*/
class SendRegistrationFiles implements HeaderMessageInterface {
public const DELAY_IN_MILISECONDS = 10000;
/**
* @var string
*/
public $message;
/**
* SendRegistrationFiles constructor.
*
* @param string $message
*/
public function __construct($message = '') {
$this->message = $message;
}
public function getHeaders(): array {
return [
'x-delay' => static::DELAY_IN_MILISECONDS,
];
}
} This implementation could be cleaned up being adopted by the messenger component, by changing the code of ...
if ($envelope->getMessage() instanceof HeaderMessageInterface) {
$headers = $envelope->getMessage()->getHeaders();
}
... and just adding the new interface somewhere in the Messenger namespace. |
@calin-marian i'm really really appreciate your comment. i'm very confused now. can you guys see this issue here? about headers and properties in another repo i think that headers attributes are not making difference in message persistence. repair that my method becomes something like this:
and not like this:
i don't know about delayed message. but i think that's become very useful when we don't need to overload queue at some point of time. just in case a lot of users make a request together. is this header the same as delivery_mode? (used as header, but called a message property) anyone? thanks. |
@zhcdsan I've created simple project showing how we solve this issue: https://github.com/industi/Symfony-RabbitMQ-Persistent-Messages/tree/master All you need to do is to copy files from App\Service\PAqmpTransport, override configuration in services:
and change transport in config\packages\messenger.yaml to use 'pamqp' instead of 'ampq' transport. Messages should be now send as persistent. Hope that will help ;) |
@zhcdsan You are right, I checked the publish() method of the AMQPExchange class, and ideed, the fourth parameter is /**
* Publish a message to an exchange.
*
* Publish a message to the exchange represented by the AMQPExchange object.
*
* @param string $message The message to publish.
* @param string $routing_key The optional routing key to which to
* publish to.
* @param integer $flags One or more of AMQP_MANDATORY and
* AMQP_IMMEDIATE.
* @param array $attributes One of content_type, content_encoding,
* message_id, user_id, app_id, delivery_mode,
* priority, timestamp, expiration, type
* or reply_to, headers.
*
* @throws AMQPExchangeException On failure.
* @throws AMQPChannelException If the channel is not open.
* @throws AMQPConnectionException If the connection to the broker was lost.
*
* @return boolean TRUE on success or FALSE on failure.
*/
public function publish(
$message,
$routing_key = null,
$flags = AMQP_NOPARAM,
array $attributes = array()
) {
} Given that, I think my suggestion is good for adding custom headers to meesages, and could maybe be spinned off in it's own issue, but it doesn't help with persistence, where you need to set the However, the meesenger componend doesn't define attributes, and is not sending any at this point. Doing a workaround for the What are your thoughts on this? |
In my opinion only option for configuring headers globally would do the job, in our situation we don't need to exclude some messages as not persistent - if you have example of case like that you can share it with us. But as you mentioned, allowing to configure attributes even on each level (globally/excahnges/queues/messeges) would be nice, but as long as we need to implement this functionality, we need to do this with walkaround. After all some solution should be implemented - as long as we cannot use all possibilities that rabbit gives us. |
Yeahh! Totally agree. Following amqp, delivery_mode is concerned about message. A few times ago, queue had default mode modified to 'durable'. Unless we have some protocol change, i think that is acceptable define properties in any message via interface. If it's possible to have it in configuration, every message could be used to send messages using them - could be something like:
or we can use something like publishOptions for exchange... but i'm not certain about. |
Maybe you should think about extending stamps somehow. It would be nice way to control attributes and headers via something like I'm still getting into MessengerComponent but now I see this is a real issue. |
Any updates on the best way to set delivery_mode ? |
Proposed implementation in #30913 :) |
…ttributes (sroze) This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Uses an `AmqpStamp` to provide flags and attributes | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | #28885 | License | MIT | Doc PR | ø Using the `AmqpStamp` you can configure the flags and any attribute (such as `delivery_mode`). Commits ------- 56fa574 Uses an `AmqpStamp` to provide flags and attributes7EAC
Is it possible that the "delivery_mode" attribute is not forwarded to the retry_strategy? Because it seems that, on a restart, i lost all the messages from the "delay" (temporary) queues. |
Uh oh!
There was an error while loading. Please reload this page.
Symfony version: 4.1.6
Hi, our company is struggling with some messenger configuration problem. We are using messenger component (symfony/amqp-pack included) andRabbitMQ, and everything is working fine, but we want to send messages as persistent - is there some way to setup this configuration by default? Our solution is simple, we wrote custom message transport, and we override sended headers in pusblish method from Connection class:
configuration example:
We need to send
'delivery_mode' => 2
, so Rabbit will keep messages persistent, shouldn't be this option possible to setup in configuration?The text was updated successfully, but these errors were encountered: