8000 allowing multiple queues for AMQP · symfony/symfony@ad8f4d8 · GitHub
[go: up one dir, main page]

Skip to content

Commit ad8f4d8

Browse files
committed
allowing multiple queues for AMQP
1 parent 9e170c0 commit ad8f4d8

File tree

6 files changed

+91
-66
lines changed

6 files changed

+91
-66
lines changed

UPGRADE-4.3.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ Messenger
9191

9292
* `Amqp` transport does not throw `\AMQPException` anymore, catch `TransportException` instead.
9393
* Deprecated the `LoggingMiddleware` class, pass a logger to `SendMessageMiddleware` instead.
94-
* Deprecated routing key from queue configuration (`queue[routing_key]` in the DSN), use exchange configuration instead (`exchange[routing_key]` in the DSN).
9594

9695
Routing
9796
-------

UPGRADE-5.0.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,6 @@ Messenger
236236
---------
237237

238238
* The `LoggingMiddleware` class has been removed, pass a logger to `SendMessageMiddleware` instead.
239-
* Routing key from queue configuration has been removed. Use exchange configuration instead (`exchange[routing_key]` in DSN).
240239

241240
Monolog
242241
-------

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ CHANGELOG
44
4.3.0
55
-----
66

7+
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
8+
* Removed publishing with a `routing_key` option from queue configuration, for
9+
AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead.
710
* Added optional parameter `prefetch_count` in connection configuration,
811
to setup channel prefetch count
912
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
@@ -53,8 +56,6 @@ CHANGELOG
5356
and queues by default. Previously, this was done when in "debug" mode
5457
only. Pass the `auto_setup` connection option to control this.
5558
* Added a `SetupTransportsCommand` command to setup the transports
56-
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
57-
* Deprecated publishing with a routing key from queue configuration, use exchange configuration instead.
5859

5960
4.2.0
6061
-----

A3DB ‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,15 @@ public function receive(callable $handler): void
4545
{
4646
while (!$this->shouldStop) {
4747
try {
48-
$amqpEnvelope = $this->connection->get();
48+
// TODO - update this after #30708 is merged
49+
$amqpEnvelope = null;
50+
foreach ($this->getAllQueueNames() as $queueName) {
51+
$amqpEnvelope = $this->connection->get($queueName);
52+
53+
if (null !== $amqpEnvelope) {
54+
break;
55+
}
56+
}
4957
} catch (\AMQPException $exception) {
5058
throw new TransportException($exception->getMessage(), 0, $exception);
5159
}

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpRoutingKeyStamp.php

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@
2020
*/
2121
final class AmqpRoutingKeyStamp implements StampInterface
2222
{
23-
/**
24-
* @var string
25-
*/
2623
private $routingKey;
2724

2825
public function __construct(string $routingKey)

src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php

Lines changed: 79 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class Connection
3535

3636
private $connectionConfiguration;
3737
private $exchangeConfiguration;
38-
private $queueConfiguration;
38+
private $queuesConfiguration;
3939
private $amqpFactory;
4040

4141
/**
@@ -51,7 +51,7 @@ class Connection
5151
/**
5252
* @var \AMQPQueue|null
5353
*/
54-
private $amqpQueue;
54+
private $amqpQueues = [];
5555

5656
/**
5757
* @var \AMQPExchange|null
@@ -68,14 +68,14 @@ class Connection
6868
* * vhost: Virtual Host to use with the AMQP service
6969
* * user: Username to use to connect the the AMQP service
7070
* * password: Password to use the connect to the AMQP service
71-
* * queue:
72-
* * name: Name of the queue
73-
* * routing_key: The routing key (if any) to use to push the messages to
71+
* * queues[name]: An array of queues, keyed by the name
72+
* * routing_keys: The routing keys (if any) to bind to this queue
7473
* * flags: Queue flags (Default: AMQP_DURABLE)
7574
* * arguments: Extra arguments
7675
* * exchange:
7776
* * name: Name of the exchange
7877
* * type: Type of exchange (Default: fanout)
78+
* * default_publish_routing_key: Routing key to use when publishing, if none is specified on the message
7979
* * flags: Exchange flags (Default: AMQP_DURABLE)
8080
* * arguments: Extra arguments
8181
* * delay:
@@ -86,7 +86,7 @@ class Connection
8686
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
8787
* * prefetch_count: set channel prefetch count
8888
*/
89-
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null)
89+
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queuesConfiguration, AmqpFactory $amqpFactory = null)
9090
{
9191
$this->connectionConfiguration = array_replace_recursive([
9292
'delay' => [
@@ -96,7 +96,7 @@ public function __construct(array $connectionConfiguration, array $exchangeConfi
9696
],
9797
], $connectionConfiguration);
9898
$this->exchangeConfiguration = $exchangeConfiguration;
99-
$this->queueConfiguration = $queueConfiguration;
99+
$this->queuesConfiguration = $queuesConfiguration;
100100
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
101101
}
102102

@@ -111,8 +111,10 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am
111111
'host' => $parsedUrl['host'] ?? 'localhost',
112112
'port' => $parsedUrl['port'] ?? 5672,
113113
'vhost' => isset($pathParts[0]) ? urldecode($pathParts[0]) : '/',
114-
'queue' => [
115-
'name' => $queueName = $pathParts[1] ?? 'messages',
114+
'queues' => [
115+
[
116+
'name' => $queueName = $pathParts[1] ?? 'messages',
117+
]
116118
],
117119
'exchange' => [
118120
'name' => $queueName,
@@ -134,14 +136,18 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am
134136
}
135137

136138
$exchangeOptions = $amqpOptions['exchange'];
137-
$queueOptions = $amqpOptions['queue'];
138-
unset($amqpOptions['queue'], $amqpOptions['exchange']);
139+
$queuesOptions = $amqpOptions['queues'];
140+
unset($amqpOptions['queues'], $amqpOptions['exchange']);
139141

140-
if (\is_array($queueOptions['arguments'] ?? false)) {
141-
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
142-
}
142+
$queuesOptions = array_map(function(array $queueOptions) {
143+
if (\is_array($queuesOptions['arguments'] ?? false)) {
144+
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
145+
}
146+
147+
return $queueOptions;
148+
}, $queuesOptions);
143149

144-
return new self($amqpOptions, $exchangeOptions, $queueOptions, $amqpFactory);
150+
return new self($amqpOptions, $exchangeOptions, $queuesOptions, $amqpFactory);
145151
}
146152

147153
private static function normalizeQueueArguments(array $arguments): array
@@ -178,9 +184,11 @@ public function publish(string $body, array $headers = [], int $delay = 0, strin
178184
$this->setup();
179185
}
180186

181-
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
182-
$attributes = $this->getAttributes($headers);
183-
$routingKey = $routingKey ?? $this->getExchangeRoutingKey();
187+
// TODO - allow flag & attributes to be configured on the message
188+
$flags = [];
189+
$attributes = [];
190+
$attributes = array_merge_recursive($attributes, ['headers' => $headers]);
191+
$routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey();
184192

185193
$this->exchange()->publish($body, $routingKey, $flags, $attributes);
186194
}
@@ -194,14 +202,16 @@ private function publishWithDelay(string $body, array $headers, int $delay, ?str
194202
$this->setupDelay($delay, $exchangeRoutingKey);
195203
}
196204

205+
// TODO - allow flag & attributes to be configured on the message
206+
$flags = [];
207+
$attributes = [];
208+
$attributes = array_merge_recursive($attributes, ['headers' => $headers]);
197209
$routingKey = $this->getRoutingKeyForDelay($delay);
198-
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
199-
$attributes = $this->getAttributes($headers);
200210

201211
$this->getDelayExchange()->publish($body, $routingKey, $flags, $attributes);
202212
}
203213

204-
private function setupDelay(int $delay, ?string $exchangeRoutingKey)
214+
private function setupDelay(int $delay, ?string $routingKey)
205215
{
206216
if (!$this->channel()->isConnected()) {
207217
$this->clear();
@@ -210,7 +220,7 @@ private function setupDelay(int $delay, ?string $exchangeRoutingKey)
210220
$exchange = $this->getDelayExchange();
211221
$exchange->declareExchange();
212222

213-
$queue = $this->createDelayQueue($delay, $exchangeRoutingKey);
223+
$queue = $this->createDelayQueue($delay, $routingKey);
214224
$queue->declareQueue();
215225
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay));
216226
}
@@ -235,7 +245,7 @@ private function getDelayExchange(): \AMQPExchange
235245
* which is the original exchange, resulting on it being put back into
236246
* the original queue.
237247
*/
238-
private function createDelayQueue(int $delay, ?string $exchangeRoutingKey)
248+
private function createDelayQueue(int $delay, ?string $routingKey)
239249
{
240250
$delayConfiguration = $this->connectionConfiguration['delay'];
241251

@@ -246,10 +256,10 @@ private function createDelayQueue(int $delay, ?string $exchangeRoutingKey)
246256
'x-dead-letter-exchange' => $this->exchange()->getName(),
247257
]);
248258

249-
$exchangeRoutingKey = $exchangeRoutingKey ?? $this->getExchangeRoutingKey();
250-
if (null !== $exchangeRoutingKey) {
259+
$routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey();
260+
if (null !== $routingKey) {
251261
// after being released from to DLX, this routing key will be used
252-
$queue->setArgument('x-dead-letter-routing-key', $exchangeRoutingKey);
262+
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
253263
}
254264

255265
return $queue;
@@ -261,18 +271,18 @@ private function getRoutingKeyForDelay(int $delay): string
261271
}
262272

263273
/**
264-
* Waits and gets a message from the configured queue.
274+
* Gets a messa 10000 ge from the specified queue.
265275
*
266276
* @throws \AMQPException
267277
*/
268-
public function get(): ?\AMQPEnvelope
278+
public function get(string $queueName): ?\AMQPEnvelope
269279
{
270280
if ($this->shouldSetup()) {
271281
$this->setup();
272282
}
273283

274284
try {
275-
if (false !== $message = $this->queue()->get()) {
285+
if (false !== $message = $this->queue($queueName)->get()) {
276286
return $message;
277287
}
278288
} catch (\AMQPQueueException $e) {
@@ -289,14 +299,14 @@ public function get(): ?\AMQPEnvelope
289299
return null;
290300
}
291301

292-
public function ack(\AMQPEnvelope $message): bool
302+
public function ack(\AMQPEnvelope $message, string $queueName): bool
293303
{
294-
return $this->queue()->ack($message->getDeliveryTag());
304+
return $this->queue($queueName)->ack($message->getDeliveryTag());
295305
}
296306

297-
public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM): bool
307+
public function nack(\AMQPEnvelope $message, string $queueName, int $flags = AMQP_NOPARAM): bool
298308
{
299-
return $this->queue()->nack($message->getDeliveryTag(), $flags);
309+
return $this->queue($queueName)->nack($message->getDeliveryTag(), $flags);
300310
}
301311

302312
public function setup(): void
@@ -307,10 +317,25 @@ public function setup(): void
307317

308318
$this->exchange()->declareExchange();
309319

310-
$this->queue()->declareQueue();
311-
$this->queue()->bind($this->exchange()->getName(), $this->queueConfiguration['routing_key'] ?? null);
320+
foreach ($this->queuesConfiguration as $queueName => $queueConfig) {
321+
$this->queue($queueName)->declareQueue();
322+
foreach ($queueConfig['routing_keys'] ?? [] as $routingKey) {
323+
$this->queue($queueName)->bind($this->exchange()->getName(), $routingKey);
324+
}
325+
}
312326
}
313327

328+
/**
329+
* @return string[]
330+
*/
331+
public function getAllQueueNames(): array
332+
{
333+
return array_keys($this->queuesConfiguration);
334+
}
335+
336+
/**
337+
* @internal
338+
*/
314339
public function channel(): \AMQPChannel
315340
{
316341
if (null === $this->amqpChannel) {
@@ -335,22 +360,29 @@ public function channel(): \AMQPChannel
335360
return $this->amqpChannel;
336361
}
337362

338-
public function queue(): \AMQPQueue
363+
/**
364+
* @internal
365+
*/
366+
public function queue(string $queueName): \AMQPQueue
339367
{
340-
if (null === $this->amqpQueue) {
341-
$this->amqpQueue = $this->amqpFactory->createQueue($this->channel());
342-
$this->amqpQueue->setName($this->queueConfiguration['name']);
343-
$this->amqpQueue->setFlags($this->queueConfiguration['flags'] ?? AMQP_DURABLE);
368+
if (!isset($this->amqpQueues[$queueName])) {
369+
$queueConfig = $this->queuesConfiguration[$queueName];
344370

345-
if (isset($this->queueConfiguration['arguments'])) {
346-
$this->amqpQueue->setArguments($this->queueConfiguration['arguments']);
371+
$amqpQueue = $this->amqpFactory->createQueue($this->channel());
372+
$amqpQueue->setName($queueConfig['name']);
373+
$amqpQueue->setFlags($queueConfig['flags'] ?? AMQP_DURABLE);
374+
375+
if (isset($queueConfig['arguments'])) {
376+
$amqpQueue->setArguments($queueConfig['arguments']);
347377
}
378+
379+
$this->amqpQueues[$queueName] = $amqpQueue;
348380
}
349381

350-
return $this->amqpQueue;
382+
return $this->amqpQueues[$queueName];
351383
}
352384

353-
public function exchange(): \AMQPExchange
385+
private function exchange(): \AMQPExchange
354386
{
355387
if (null === $this->amqpExchange) {
356388
$this->amqpExchange = $this->amqpFactory->createExchange($this->channel());
@@ -374,7 +406,7 @@ public function getConnectionConfiguration(): array
374406
private function clear(): void
375407
{
376408
$this->amqpChannel = null;
377-
$this->amqpQueue = null;
409+
$this->amqpQueues = [];
378410
$this->amqpExchange = null;
379411
}
380412

@@ -391,19 +423,8 @@ private function shouldSetup(): bool
391423
return true;
392424
}
393425

394-
private function getAttributes(array $headers): array
426+
private function getDefaultPublishRoutingKey(): ?string
395427
{
396-
return array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]);
397-
}
398-
399-
private function getExchangeRoutingKey(): ?string
400-
{
401-
$routingKey = $this->exchangeConfiguration['routing_key'] ?? null;
402-
if (null === $routingKey && isset($this->queueConfiguration['routing_key'])) {
403-
$routingKey = $this->queueConfiguration['routing_key'];
404-
@trigger_error('Routing key from "queue" configuration is deprecated. Use "exchange" configuration instead.', E_USER_DEPRECATED);
405-
}
406-
407-
return $routingKey;
428+
return $this->exchangeConfiguration['default_publish_routing_key'] ?? null;
408429
}
409430
}

0 commit comments

Comments
 (0)
0