@@ -58,8 +58,22 @@ class Connection
58
58
*/
59
59
private $ amqpDelayExchange ;
60
60
61
+ public function __construct (array $ connectionOptions , array $ exchangeOptions , array $ queuesOptions , AmqpFactory $ amqpFactory = null )
62
+ {
63
+ $ this ->connectionOptions = array_replace_recursive ([
64
+ 'delay ' => [
65
+ 'routing_key_pattern ' => 'delay_%routing_key%_%delay% ' ,
66
+ 'exchange_name ' => 'delay ' ,
67
+ 'queue_name_pattern ' => 'delay_queue_%routing_key%_%delay% ' ,
68
+ ],
69
+ ], $ connectionOptions );
70
+ $ this ->exchangeOptions = $ exchangeOptions ;
71
+ $ this ->queuesOptions = $ queuesOptions ;
72
+ $ this ->amqpFactory = $ amqpFactory ?: new AmqpFactory ();
73
+ }
74
+
61
75
/**
62
- * Constructor .
76
+ * Creates a connection based on the DSN and options .
63
77
*
64
78
* Available options:
65
79
*
@@ -81,29 +95,19 @@ class Connection
81
95
* * delay:
82
96
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%")
83
97
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%")
84
- * * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry ")
98
+ * * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay ")
85
99
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
86
- * * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
87
100
* * prefetch_count: set channel prefetch count
88
101
*/
89
- public function __construct (array $ connectionOptions , array $ exchangeOptions , array $ queuesOptions , AmqpFactory $ amqpFactory = null )
90
- {
91
- $ this ->connectionOptions = array_replace_recursive ([
92
- 'delay ' => [
93
- 'routing_key_pattern ' => 'delay_%routing_key%_%delay% ' ,
94
- 'exchange_name ' => 'delay ' ,
95
- 'queue_name_pattern ' => 'delay_queue_%routing_key%_%delay% ' ,
96
- ],
97
- ], $ connectionOptions );
98
- $ this ->exchangeOptions = $ exchangeOptions ;
99
- $ this ->queuesOptions = $ queuesOptions ;
100
- $ this ->amqpFactory = $ amqpFactory ?: new AmqpFactory ();
101
- }
102
-
103
102
public static function fromDsn (string $ dsn , array $ options = [], AmqpFactory $ amqpFactory = null ): self
104
103
{
105
104
if (false === $ parsedUrl = parse_url ($ dsn )) {
106
- throw new InvalidArgumentException (sprintf ('The given AMQP DSN "%s" is invalid. ' , $ dsn ));
105
+ // this is a valid URI that parse_url cannot handle when you want to pass all parameters as options
106
+ if ('amqp:// ' !== $ dsn ) {
107
+ throw new InvalidArgumentException (sprintf ('The given AMQP DSN "%s" is invalid. ' , $ dsn ));
108
+ }
109
+
110
+ $ parsedUrl = [];
107
111
}
108
112
109
113
$ pathParts = isset ($ parsedUrl ['path ' ]) ? explode ('/ ' , trim ($ parsedUrl ['path ' ], '/ ' )) : [];
@@ -275,18 +279,17 @@ private function createDelayQueue(int $delay, ?string $routingKey)
275
279
$ queue = $ this ->amqpFactory ->createQueue ($ this ->channel ());
276
280
$ queue ->setName (str_replace (
277
281
['%delay% ' , '%routing_key% ' ],
278
- [$ delay , $ routingKey ?: '' ],
282
+ [$ delay , $ routingKey ?? '' ],
279
283
$ this ->connectionOptions ['delay ' ]['queue_name_pattern ' ]
280
- ));
284
+ ));
281
285
$ queue ->setArguments ([
282
286
'x-message-ttl ' => $ delay ,
283
287
'x-dead-letter-exchange ' => $ this ->exchange ()->getName (),
284
288
]);
285
289
286
- if (null !== $ routingKey ) {
287
- // after being released from to DLX, this routing key will be used
288
- $ queue ->setArgument ('x-dead-letter-routing-key ' , $ routingKey );
289
- }
290
+ // after being released from to DLX, make sure the original routing key will be used
291
+ // we must use an empty string instead of null for the argument to be picked up
292
+ $ queue ->setArgument ('x-dead-letter-routing-key ' , $ routingKey ?? '' );
290
293
291
294
return $ queue ;
292
295
}
@@ -295,7 +298,7 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
295
298
{
296
299
return str_replace (
297
300
['%delay% ' , '%routing_key% ' ],
298
- [$ delay , $ finalRoutingKey ?: '' ],
301
+ [$ delay , $ finalRoutingKey ?? '' ],
299
302
$ this ->connectionOptions ['
42E5
;delay ' ]['routing_key_pattern ' ]
300
303
);
301
304
}
0 commit comments