@@ -58,9 +58,21 @@ class Connection
58
58
*/
59
59
private $ amqpDelayExchange ;
60
60
61
+ public function __construct (array $ connectionOptions , array $ exchangeOptions , array $ queuesOptions , AmqpFactory $ amqpFactory = null )
62
+ {
63
+ $ this ->connectionOptions = array_replace_recursive ([
64
+ 'delay ' => [
65
+ 'routing_key_pattern ' => 'delay_%routing_key%_%delay% ' ,
66
+ 'exchange_name ' => 'delay ' ,
67
+ 'queue_name_pattern ' => 'delay_queue_%routing_key%_%delay% ' ,
68
+ ],
69
+ ], $ connectionOptions );
70
+ $ this ->exchangeOptions = $ exchangeOptions ;
71
+ $ this ->queuesOptions = $ queuesOptions ;
72
+ $ this ->amqpFactory = $ amqpFactory ?: new AmqpFactory ();
73
+ }
74
+
61
75
/**
62
- * Constructor.
63
- *
64
76
* Available options:
65
77
*
66
78
* * host: Hostname of the AMQP service
@@ -81,29 +93,19 @@ class Connection
81
93
* * delay:
82
94
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%")
83
95
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%")
84
- * * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry ")
96
+ * * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay ")
85
97
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
86
- * * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
87
98
* * prefetch_count: set channel prefetch count
88
99
*/
89
- public function __construct (array $ connectionOptions , array $ exchangeOptions , array $ queuesOptions , AmqpFactory $ amqpFactory = null )
90
- {
91
- $ this ->connectionOptions = array_replace_recursive ([
92
- 'delay ' => [
93
- 'routing_key_pattern ' => 'delay_%routing_key%_%delay% ' ,
94
- 'exchange_name ' => 'delay ' ,
95
- 'queue_name_pattern ' => 'delay_queue_%routing_key%_%delay% ' ,
96
- ],
97
- ], $ connectionOptions );
98
- $ this ->exchangeOptions = $ exchangeOptions ;
99
- $ this ->queuesOptions = $ queuesOptions ;
100
- $ this ->amqpFactory = $ amqpFactory ?: new AmqpFactory ();
101
- }
102
-
103
100
public static function fromDsn (string $ dsn , array $ options = [], AmqpFactory $ amqpFactory = null ): self
104
101
{
105
102
if (false === $ parsedUrl = parse_url ($ dsn )) {
106
- throw new InvalidArgumentException (sprintf ('The given AMQP DSN "%s" is invalid. ' , $ dsn ));
103
+ // this is a valid URI that parse_url cannot handle when you want to pass all parameters as options
104
+ if ('amqp:// ' !== $ dsn ) {
105
+ throw new InvalidArgumentException (sprintf ('The given AMQP DSN "%s" is invalid. ' , $ dsn ));
106
+ }
107
+
108
+ $ parsedUrl = [];
107
109
}
108
1
10000
10
109
111
$ pathParts = isset ($ parsedUrl ['path ' ]) ? explode ('/ ' , trim ($ parsedUrl ['path ' ], '/ ' )) : [];
@@ -275,18 +277,17 @@ private function createDelayQueue(int $delay, ?string $routingKey)
275
277
$ queue = $ this ->amqpFactory ->createQueue ($ this ->channel ());
276
278
$ queue ->setName (str_replace (
277
279
['%delay% ' , '%routing_key% ' ],
278
- [$ delay , $ routingKey ?: '' ],
280
+ [$ delay , $ routingKey ?? '' ],
279
281
$ this ->connectionOptions ['delay ' ]['queue_name_pattern ' ]
280
- ));
282
+ ));
281
283
$ queue ->setArguments ([
282
284
'x-message-ttl ' => $ delay ,
283
285
'x-dead-letter-exchange ' => $ this ->exchange ()->getName (),
284
286
]);
285
287
286
- if (null !== $ routingKey ) {
287
- // after being released from to DLX, this routing key will be used
288
- $ queue ->setArgument ('x-dead-letter-routing-key ' , $ routingKey );
289
- }
288
+ // after being released from to DLX, make sure the original routing key will be used
289
+ // we must use an empty string instead of null for the argument to be picked up
290
+ $ queue ->setArgument ('x-dead-letter-routing-key ' , $ routingKey ?? '' );
290
291
291
292
return $ queue ;
292
293
}
@@ -295,7 +296,7 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
295
296
{
296
297
return str_replace (
297
298
['%delay% ' , '%routing_key% ' ],
298
- [$ delay , $ finalRoutingKey ?: '' ],
299
+ [$ delay , $ finalRoutingKey ?? '' ],
299
300
$ this ->connectionOptions ['delay ' ]['routing_key_pattern ' ]
300
301
);
301
302
}
0 commit comments