@@ -35,7 +35,7 @@ class Connection
35
35
36
36
private $ connectionConfiguration ;
37
37
private $ exchangeConfiguration ;
38
- private $ queueConfiguration ;
38
+ private $ queuesConfiguration ;
39
39
private $ amqpFactory ;
40
40
41
41
/**
@@ -51,7 +51,7 @@ class Connection
51
51
/**
52
52
* @var \AMQPQueue|null
53
53
*/
54
- private $ amqpQueue ;
54
+ private $ amqpQueues = [] ;
55
55
56
56
/**
57
57
* @var \AMQPExchange|null
@@ -68,14 +68,14 @@ class Connection
68
68
* * vhost: Virtual Host to use with the AMQP service
69
69
* * user: Username to use to connect the the AMQP service
70
70
* * password: Password to use the connect to the AMQP service
71
- * * queue:
72
- * * name: Name of the queue
73
- * * routing_key: The routing key (if any) to use to push the messages to
71
+ * * queues[name]: An array of queues, keyed by the name
72
+ * * routing_keys: The routing keys (if any) to bind to this queue
74
73
* * flags: Queue flags (Default: AMQP_DURABLE)
75
74
* * arguments: Extra arguments
76
75
* * exchange:
77
76
* * name: Name of the exchange
78
77
* * type: Type of exchange (Default: fanout)
78
+ * * default_publish_routing_key: Routing key to use when publishing, if none is specified on the message
79
79
* * flags: Exchange flags (Default: AMQP_DURABLE)
80
80
* * arguments: Extra arguments
81
81
* * delay:
@@ -86,7 +86,7 @@ class Connection
86
86
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
87
87
* * prefetch_count: set channel prefetch count
88
88
*/
89
- public function __construct (array $ connectionConfiguration , array $ exchangeConfiguration , array $ queueConfiguration , AmqpFactory $ amqpFactory = null )
89
+ public function __construct (array $ connectionConfiguration , array $ exchangeConfiguration , array $ queuesConfiguration , AmqpFactory $ amqpFactory = null )
90
90
{
91
91
$ this ->connectionConfiguration = array_replace_recursive ([
92
92
'delay ' => [
@@ -96,7 +96,7 @@ public function __construct(array $connectionConfiguration, array $exchangeConfi
96
96
],
97
97
], $ connectionConfiguration );
98
98
$ this ->exchangeConfiguration = $ exchangeConfiguration ;
99
- $ this ->queueConfiguration = $ queueConfiguration ;
99
+ $ this ->queuesConfiguration = $ queuesConfiguration ;
100
100
$ this ->amqpFactory = $ amqpFactory ?: new AmqpFactory ();
101
101
}
102
102
@@ -111,8 +111,10 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am
111
111
'host ' => $ parsedUrl ['host ' ] ?? 'localhost ' ,
112
112
'port ' => $ parsedUrl ['port ' ] ?? 5672 ,
113
113
'vhost ' => isset ($ pathParts [0 ]) ? urldecode ($ pathParts [0 ]) : '/ ' ,
114
- 'queue ' => [
115
- 'name ' => $ queueName = $ pathParts [1 ] ?? 'messages ' ,
114
+ 'queues ' => [
115
+ [
116
+ 'name ' => $ queueName = $ pathParts [1 ] ?? 'messages ' ,
117
+ ]
116
118
],
117
119
'exchange ' => [
118
120
'name ' => $ queueName ,
@@ -134,14 +136,18 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am
134
136
}
135
137
136
138
$ exchangeOptions = $ amqpOptions ['exchange ' ];
137
- $ queueOptions = $ amqpOptions ['queue ' ];
138
- unset($ amqpOptions ['queue ' ], $ amqpOptions ['exchange ' ]);
139
+ $ queuesOptions = $ amqpOptions ['queues ' ];
140
+ unset($ amqpOptions ['queues ' ], $ amqpOptions ['exchange ' ]);
139
141
140
- if (\is_array ($ queueOptions ['arguments ' ] ?? false )) {
141
- $ queueOptions ['arguments ' ] = self ::normalizeQueueArguments ($ queueOptions ['arguments ' ]);
142
- }
142
+ $ queuesOptions = array_map (function (array $ queueOptions ) {
143
+ if (\is_array ($ queuesOptions ['arguments ' ] ?? false )) {
144
+ $ queueOptions ['arguments ' ] = self ::normalizeQueueArguments ($ queueOptions ['arguments ' ]);
145
+ }
146
+
147
+ return $ queueOptions ;
148
+ }, $ queuesOptions );
143
149
144
- return new self ($ amqpOptions , $ exchangeOptions , $ queueOptions , $ amqpFactory );
150
+ return new self ($ amqpOptions , $ exchangeOptions , $ queuesOptions , $ amqpFactory );
145
151
}
146
152
147
153
private static function normalizeQueueArguments (array $ arguments ): array
@@ -178,9 +184,11 @@ public function publish(string $body, array $headers = [], int $delay = 0, strin
178
184
$ this ->setup ();
179
185
}
180
186
181
- $ flags = $ this ->queueConfiguration ['flags ' ] ?? AMQP_NOPARAM ;
182
- $ attributes = $ this ->getAttributes ($ headers );
183
- $ routingKey = $ routingKey ?? $ this ->getExchangeRoutingKey ();
187
+ // TODO - allow flag & attributes to be configured on the message
188
+ $ flags = [];
189
+ $ attributes = [];
190
+ $ attributes = array_merge_recursive ($ attributes , ['headers ' => $ headers ]);
191
+ $ routingKey = $ routingKey ?? $ this ->getDefaultPublishRoutingKey ();
184
192
185
193
$ this ->exchange ()->publish ($ body , $ routingKey , $ flags , $ attributes );
186
194
}
@@ -194,14 +202,16 @@ private function publishWithDelay(string $body, array $headers, int $delay, ?str
194
202
$ this ->setupDelay ($ delay , $ exchangeRoutingKey );
195
203
}
196
204
205
+ // TODO - allow flag & attributes to be configured on the message
206
+ $ flags = [];
207
+ $ attributes = [];
208
+ $ attributes = array_merge_recursive ($ attributes , ['headers ' => $ headers ]);
197
209
$ routingKey = $ this ->getRoutingKeyForDelay ($ delay );
198
- $ flags = $ this ->queueConfiguration ['flags ' ] ?? AMQP_NOPARAM ;
199
- $ attributes = $ this ->getAttributes ($ headers );
200
210
201
211
$ this ->getDelayExchange ()->publish ($ body , $ routingKey , $ flags , $ attributes );
202
212
}
203
213
204
- private function setupDelay (int $ delay , ?string $ exchangeRoutingKey )
214
+ private function setupDelay (int $ delay , ?string $ routingKey )
205
215
{
206
216
if (!$ this ->channel ()->isConnected ()) {
207
217
$ this ->clear ();
@@ -210,7 +220,7 @@ private function setupDelay(int $delay, ?string $exchangeRoutingKey)
210
220
$ exchange = $ this ->getDelayExchange ();
211
221
$ exchange ->declareExchange ();
212
222
213
- $ queue = $ this ->createDelayQueue ($ delay , $ exchangeRoutingKey );
223
+ $ queue = $ this ->createDelayQueue ($ delay , $ routingKey );
214
224
$ queue ->declareQueue ();
215
225
$ queue ->bind ($ exchange ->getName (), $ this ->getRoutingKeyForDelay ($ delay ));
216
226
}
@@ -235,7 +245,7 @@ private function getDelayExchange(): \AMQPExchange
235
245
* which is the original exchange, resulting on it being put back into
236
246
* the original queue.
237
247
*/
238
- private function createDelayQueue (int $ delay , ?string $ exchangeRoutingKey )
248
+ private function createDelayQueue (int $ delay , ?string $ routingKey )
239
249
{
240
250
$ delayConfiguration = $ this ->connectionConfiguration ['delay ' ];
241
251
@@ -246,10 +256,10 @@ private function createDelayQueue(int $delay, ?string $exchangeRoutingKey)
246
256
'x-dead-letter-exchange ' => $ this ->exchange ()->getName (),
247
257
]);
248
258
249
- $ exchangeRoutingKey = $ exchangeRoutingKey ?? $ this ->getExchangeRoutingKey ();
250
- if (null !== $ exchangeRoutingKey ) {
259
+ $ routingKey = $ routingKey ?? $ this ->getDefaultPublishRoutingKey ();
260
+ if (null !== $ routingKey ) {
251
261
// after being released from to DLX, this routing key will be used
252
- $ queue ->setArgument ('x-dead-letter-routing-key ' , $ exchangeRoutingKey );
262
+ $ queue ->setArgument ('x-dead-letter-routing-key ' , $ routingKey );
253
263
}
254
264
255
265
return $ queue ;
@@ -261,18 +271,18 @@ private function getRoutingKeyForDelay(int $delay): string
261
271
}
262
272
263
273
/**
264
- * Waits and gets a message from the configured queue.
274
+ * Gets a messa
10000
ge from the specified queue.
265
275
*
266
276
* @throws \AMQPException
267
277
*/
268
- public function get (): ?\AMQPEnvelope
278
+ public function get (string $ queueName ): ?\AMQPEnvelope
269
279
{
270
280
if ($ this ->shouldSetup ()) {
271
281
$ this ->setup ();
272
282
}
273
283
274
284
try {
275
- if (false !== $ message = $ this ->queue ()->get ()) {
285
+ if (false !== $ message = $ this ->queue ($ queueName )->get ()) {
276
286
return $ message ;
277
287
}
278
288
} catch (\AMQPQueueException $ e ) {
@@ -289,14 +299,14 @@ public function get(): ?\AMQPEnvelope
289
299
return null ;
290
300
}
291
301
292
- public function ack (\AMQPEnvelope $ message ): bool
302
+ public function ack (\AMQPEnvelope $ message, string $ queueName ): bool
293
303
{
294
- return $ this ->queue ()->ack ($ message ->getDeliveryTag ());
304
+ return $ this ->queue ($ queueName )->ack ($ message ->getDeliveryTag ());
295
305
}
296
306
297
- public function nack (\AMQPEnvelope $ message , int $ flags = AMQP_NOPARAM ): bool
307
+ public function nack (\AMQPEnvelope $ message , string $ queueName , int $ flags = AMQP_NOPARAM ): bool
298
308
{
299
- return $ this ->queue ()->nack ($ message ->getDeliveryTag (), $ flags );
309
+ return $ this ->queue ($ queueName )->nack ($ message ->getDeliveryTag (), $ flags );
300
310
}
301
311
302
312
public function setup (): void
@@ -307,10 +317,25 @@ public function setup(): void
307
317
308
318
$ this ->exchange ()->declareExchange ();
309
319
310
- $ this ->queue ()->declareQueue ();
311
- $ this ->queue ()->bind ($ this ->exchange ()->getName (), $ this ->queueConfiguration ['routing_key ' ] ?? null );
320
+ foreach ($ this ->queuesConfiguration as $ queueName => $ queueConfig ) {
321
+ $ this ->queue ($ queueName )->declareQueue ();
322
+ foreach ($ queueConfig ['routing_keys ' ] ?? [] as $ routingKey ) {
323
+ $ this ->queue ($ queueName )->bind ($ this ->exchange ()->getName (), $ routingKey );
324
+ }
325
+ }
312
326
}
313
327
328
+ /**
329
+ * @return string[]
330
+ */
331
+ public function getAllQueueNames (): array
332
+ {
333
+ return array_keys ($ this ->queuesConfiguration );
334
+ }
335
+
336
+ /**
337
+ * @internal
338
+ */
314
339
public function channel (): \AMQPChannel
315
340
{
316
341
if (null === $ this ->amqpChannel ) {
@@ -335,22 +360,29 @@ public function channel(): \AMQPChannel
335
360
return $ this ->amqpChannel ;
336
361
}
337
362
338
- public function queue (): \AMQPQueue
363
+ /**
364
+ * @internal
365
+ */
366
+ public function queue (string $ queueName ): \AMQPQueue
339
367
{
340
- if (null === $ this ->amqpQueue ) {
341
- $ this ->amqpQueue = $ this ->amqpFactory ->createQueue ($ this ->channel ());
342
- $ this ->amqpQueue ->setName ($ this ->queueConfiguration ['name ' ]);
343
- $ this ->amqpQueue ->setFlags ($ this ->queueConfiguration ['flags ' ] ?? AMQP_DURABLE );
368
+ if (!isset ($ this ->amqpQueues [$ queueName ])) {
369
+ $ queueConfig = $ this ->queuesConfiguration [$ queueName ];
344
370
345
- if (isset ($ this ->queueConfiguration ['arguments ' ])) {
346
- $ this ->amqpQueue ->setArguments ($ this ->queueConfiguration ['arguments ' ]);
371
+ $ amqpQueue = $ this ->amqpFactory ->createQueue ($ this ->channel ());
372
+ $ amqpQueue ->setName ($ queueConfig ['name ' ]);
373
+ $ amqpQueue ->setFlags ($ queueConfig ['flags ' ] ?? AMQP_DURABLE );
374
+
375
+ if (isset ($ queueConfig ['arguments ' ])) {
376
+ $ amqpQueue ->setArguments ($ queueConfig ['arguments ' ]);
347
377
}
378
+
379
+ $ this ->amqpQueues [$ queueName ] = $ amqpQueue ;
348
380
}
349
381
350
- return $ this ->amqpQueue ;
382
+ return $ this ->amqpQueues [ $ queueName ] ;
351
383
}
352
384
353
- public function exchange (): \AMQPExchange
385
+ private function exchange (): \AMQPExchange
354
386
{
355
387
if (null === $ this ->amqpExchange ) {
356
388
$ this ->amqpExchange = $ this ->amqpFactory ->createExchange ($ this ->channel ());
@@ -374,7 +406,7 @@ public function getConnectionConfiguration(): array
374
406
private function clear (): void
375
407
{
376
408
$ this ->amqpChannel = null ;
377
- $ this ->amqpQueue = null ;
409
+ $ this ->amqpQueues = [] ;
378
410
$ this ->amqpExchange = null ;
379
411
}
380
412
@@ -391,19 +423,8 @@ private function shouldSetup(): bool
391
423
return true ;
392
424
}
393
425
394
- private function getAttributes ( array $ headers ): array
426
+ private function getDefaultPublishRoutingKey ( ): ? string
395
427
{
396
- return array_merge_recursive ($ this ->queueConfiguration ['attributes ' ] ?? [], ['headers ' => $ headers ]);
397
- }
398
-
399
- private function getExchangeRoutingKey (): ?string
400
- {
401
- $ routingKey = $ this ->exchangeConfiguration ['routing_key ' ] ?? null ;
402
- if (null === $ routingKey && isset ($ this ->queueConfiguration ['routing_key ' ])) {
403
- $ routingKey = $ this ->queueConfiguration ['routing_key ' ];
404
- @trigger_error ('Routing key from "queue" configuration is deprecated. Use "exchange" configuration instead. ' , E_USER_DEPRECATED );
405
- }
406
-
407
- return $ routingKey ;
428
+ return $ this ->exchangeConfiguration ['default_publish_routing_key ' ] ?? null ;
408
429
}
409
430
}
0 commit comments