@@ -62,9 +62,8 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
62
62
{
63
63
$ this ->connectionOptions = array_replace_recursive ([
64
64
'delay ' => [
65
- 'routing_key_pattern ' => 'delay_%exchange_name%_%routing_key%_%delay% ' ,
66
65
'exchange_name ' => 'delay ' ,
67
- 'queue_name_pattern ' => 'delay_queue_ %exchange_name%_%routing_key%_%delay% ' ,
66
+ 'queue_name_pattern ' => 'delay_ %exchange_name%_%routing_key%_%delay% ' ,
68
67
],
69
68
], $ connectionOptions );
70
69
$ this ->exchangeOptions = $ exchangeOptions ;
@@ -93,9 +92,8 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
93
92
* * flags: Exchange flags (Default: AMQP_DURABLE)
94
93
* * arguments: Extra arguments
95
94
* * delay:
96
- * * routing_key_pattern: The pattern of the routing key (Default: "delay_%exchange_name%_%routing_key%_%delay%")
97
- * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%exchange_name%_%routing_key%_%delay%")
98
- * * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay")
95
+ * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
96
+ * * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delay")
99
97
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
100
98
* * prefetch_count: set channel prefetch count
101
99
*/
@@ -171,20 +169,20 @@ private static function normalizeQueueArguments(array $arguments): array
171
169
}
172
170
173
171
/**
174
- * @param int $delay The delay in milliseconds
175
- *
176
172
* @throws \AMQPException
177
173
*/
178
- public function publish (string $ body , array $ headers = [], int $ delay = 0 , AmqpStamp $ amqpStamp = null ): void
174
+ public function publish (string $ body , array $ headers = [], int $ delayInMs = 0 , AmqpStamp $ amqpStamp = null ): void
179
175
{
180
- if (0 !== $ delay ) {
181
- $ this ->publishWithDelay ($ body , $ headers , $ delay , $ amqpStamp );
176
+ $ this ->clearWhenDisconnected ();
177
+
178
+ if (0 !== $ delayInMs ) {
179
+ $ this ->publishWithDelay ($ body , $ headers , $ delayInMs , $ amqpStamp );
182
180
183
181
return ;
184
182
}
185
183
186
184
if ($ this ->shouldSetup ()) {
187
- $ this ->setup ();
185
+ $ this ->setupExchangeAndQueues ();
188
186
}
189
187
190
188
$ this ->publishOnExchange (
@@ -213,9 +211,7 @@ private function publishWithDelay(string $body, array $headers, int $delay, Amqp
213
211
{
214
212
$ routingKey = $ this ->getRoutingKeyForMessage ($ amqpStamp );
215
213
216
- if ($ this ->shouldSetup ()) {
217
- $ this ->setupDelay ($ delay , $ routingKey );
218
- }
214
+ $ this ->setupDelay ($ delay , $ routingKey );
219
215
220
216
$ this ->publishOnExchange (
221
217
$ this ->getDelayExchange (),
@@ -241,15 +237,12 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
241
237
242
238
private function setupDelay (int $ delay , ?string $ routingKey )
243
239
{
244
- if (! $ this ->channel ()-> isConnected ()) {
245
- $ this ->clear ();
240
+ if ($ this ->shouldSetup ()) {
241
+ $ this ->setup (); // setup delay exchange and normal exchange for delay queue to DLX messages to
246
242
}
247
243
248
- $ this ->exchange ()->declareExchange (); // setup normal exchange for delay queue to DLX messages to
249
- $ this ->getDelayExchange ()->declareExchange ();
250
-
251
244
$ queue = $ this ->createDelayQueue ($ delay , $ routingKey );
252
- $ queue ->declareQueue ();
245
+ $ queue ->declareQueue (); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
253
246
$ queue ->bind ($ this ->connectionOptions ['delay ' ]['exchange_name ' ], $ this ->getRoutingKeyForDelay ($ delay , $ routingKey ));
254
247
}
255
248
@@ -283,6 +276,9 @@ private function createDelayQueue(int $delay, ?string $routingKey)
283
276
));
284
277
$ queue ->setArguments ([
285
278
'x-message-ttl ' => $ delay ,
279
+ // delete the delay queue 10 seconds after the message expires
280
+ // publishing another message redeclares the queue which renews the lease
281
+ 'x-expires ' => $ delay + 10000 ,
286
282
'x-dead-letter-exchange ' => $ this ->exchangeOptions ['name ' ],
287
283
// after being released from to DLX, make sure the original routing key will be used
288
284
// we must use an empty string instead of null for the argument to be picked up
@@ -297,7 +293,7 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
297
293
return str_replace (
298
294
['%delay% ' , '%exchange_name% ' , '%routing_key% ' ],
299
295
[$ delay , $ this ->exchangeOptions ['name ' ], $ finalRoutingKey ?? '' ],
300
- $ this ->connectionOptions ['delay ' ]['routing_key_pattern ' ]
296
+ $ this ->connectionOptions ['delay ' ]['queue_name_pattern ' ]
301
297
);
302
298
}
303
299
@@ -308,8 +304,10 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
308
304
*/
309
305
public function get (string $ queueName ): ?\AMQPEnvelope
310
306
{
307
+ $ this ->clearWhenDisconnected ();
308
+
311
309
if ($ this ->shouldSetup ()) {
312
- $ this ->setup ();
310
+ $ this ->setupExchangeAndQueues ();
313
311
}
314
312
315
313
try {
@@ -319,7 +317,7 @@ public function get(string $queueName): ?\AMQPEnvelope
319
317
} catch (\AMQPQueueException $ e ) {
320
318
if (404 === $ e ->getCode () && $ this ->shouldSetup ()) {
321
319
// If we get a 404 for the queue, it means we need to setup the exchange & queue.
322
- $ this ->setup ();
320
+ $ this ->setupExchangeAndQueues ();
323
321
324
322
return $ this ->get ();
325
323
}
@@ -342,10 +340,12 @@ public function nack(\AMQPEnvelope $message, string $queueName, int $flags = AMQ
342
340
343
341
public function setup (): void
344
342
{
345
- if (! $ this ->channel ()-> isConnected ()) {
346
- $ this ->clear ();
347
- }
343
+ $ this ->setupExchangeAndQueues ();
344
+ $ this ->getDelayExchange ()-> declareExchange ();
345
+ }
348
346
347
+ private function setupExchangeAndQueues (): void
348
+ {
349
349
$ this ->exchange ()->declareExchange ();
350
350
351
351
foreach ($ this ->queuesOptions as $ queueName => $ queueConfig ) {
@@ -424,12 +424,14 @@ public function exchange(): \AMQPExchange
424
424
return $ this ->amqpExchange ;
425
425
}
426
426
427
- private function clear (): void
427
+ private function clearWhenDisconnected (): void
428
428
{
429
- $ this ->amqpChannel = null ;
430
- $ this ->amqpQueues = [];
431
- $ this ->amqpExchange = null ;
432
- $ this ->amqpDelayExchange = null ;
429
+ if (!$ this ->channel ()->isConnected ()) {
430
+ $ this ->amqpChannel = null ;
431
+ $ this ->amqpQueues = [];
432
+ $ this ->amqpExchange = null ;
433
+ $ this ->amqpDelayExchange = null ;
434
+ }
433
435
}
434
436
435
437
private function shouldSetup (): bool
0 commit comments