@@ -62,7 +62,7 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
62
62
{
63
63
$ this ->connectionOptions = array_replace_recursive ([
64
64
'delay ' => [
65
- 'exchange_name ' => 'delay ' ,
65
+ 'exchange_name ' => 'delays ' ,
66
66
'queue_name_pattern ' => 'delay_%exchange_name%_%routing_key%_%delay% ' ,
67
67
],
68
68
], $ connectionOptions );
@@ -93,7 +93,7 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
93
93
* * arguments: Extra arguments
94
94
* * delay:
95
95
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
96
- * * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delay ")
96
+ * * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delays ")
97
97
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
98
98
* * prefetch_count: set channel prefetch count
99
99
*/
@@ -252,6 +252,11 @@ private function getDelayExchange(): \AMQPExchange
252
252
$ this ->amqpDelayExchange = $ this ->amqpFactory ->createExchange ($ this ->channel ());
253
253
$ this ->amqpDelayExchange ->setName ($ this ->connectionOptions ['delay ' ]['exchange_name ' ]);
254
254
$ this ->amqpDelayExchange ->setType (AMQP_EX_TYPE_DIRECT );
255
+ if ('delays ' === $ this ->connectionOptions ['delay ' ]['exchange_name ' ]) {
256
+ // only add the new flag when the name was not provided explicitly so we're using the new default name to prevent a redeclaration error
257
+ // the condition will be removed in 4.4
258
+ $ this ->amqpDelayExchange ->setFlags (AMQP_DURABLE );
259
+ }
255
260
}
256
261
257
262
return $ this ->amqpDelayExchange ;
@@ -274,16 +279,24 @@ private function createDelayQueue(int $delay, ?string $routingKey)
274
279
[$ delay , $ this ->exchangeOptions ['name ' ], $ routingKey ?? '' ],
275
280
$ this ->connectionOptions ['delay ' ]['queue_name_pattern ' ]
276
281
));
282
+ if ('delay_%exchange_name%_%routing_key%_%delay% ' === $ this ->connectionOptions ['delay ' ]['queue_name_pattern ' ]) {
283
+ // the condition will be removed in 4.4
284
+ $ queue ->setFlags (AMQP_DURABLE );
285
+ $ extraArguments = [
286
+ // delete the delay queue 10 seconds after the message expires
287
+ // publishing another message redeclares the queue which renews the lease
288
+ 'x-expires ' => $ delay + 10000 ,
289
+ ];
290
+ } else {
291
+ $ extraArguments = [];
292
+ }
277
293
$ queue ->setArguments ([
278
294
'x-message-ttl ' => $ delay ,
279
- // delete the delay queue 10 seconds after the message expires
280
- // publishing another message redeclares the queue which renews the lease
281
- 'x-expires ' => $ delay + 10000 ,
282
295
'x-dead-letter-exchange ' => $ this ->exchangeOptions ['name ' ],
283
296
// after being released from to DLX, make sure the original routing key will be used
284
297
// we must use an empty string instead of null for the argument to be picked up
285
298
'x-dead-letter-routing-key ' => $ routingKey ?? '' ,
286
- ]);
299
+ ] + $ extraArguments );
287
300
288
301
return $ queue ;
289
302
}
0 commit comments