-
-
Notifications
You must be signed in to change notification settings - Fork 9.7k
[Messenger] AMQP configurable routing key & multiple queues #30770
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
weaverryan
wants to merge
3
commits into
symfony:master
from
weaverryan:amqp-multiple-queues-routing-keys
Closed
Changes from 1 commit
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev
Previous commit
allowing multiple queues for AMQP
- Loading branch information
commit 34441730ec1b0a49585c8202b823b12212756c3d
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,7 +35,7 @@ class Connection | |
|
||
private $connectionConfiguration; | ||
private $exchangeConfiguration; | ||
private $queueConfiguration; | ||
private $queuesConfiguration; | ||
private $amqpFactory; | ||
|
||
/** | ||
|
@@ -51,7 +51,7 @@ class Connection | |
/** | ||
* @var \AMQPQueue|null | ||
*/ | ||
private $amqpQueue; | ||
private $amqpQueues = []; | ||
|
||
/** | ||
* @var \AMQPExchange|null | ||
|
@@ -68,14 +68,14 @@ class Connection | |
* * vhost: Virtual Host to use with the AMQP service | ||
* * user: Username to use to connect the the AMQP service | ||
* * password: Password to use the connect to the AMQP service | ||
* * queue: | ||
* * name: Name of the queue | ||
* * routing_key: The routing key (if any) to use to push the messages to | ||
* * queues[name]: An array of queues, keyed by the name | ||
* * routing_keys: The routing keys (if any) to bind to this queue | ||
* * flags: Queue flags (Default: AMQP_DURABLE) | ||
* * arguments: Extra arguments | ||
* * exchange: | ||
* * name: Name of the exchange | ||
* * type: Type of exchange (Default: fanout) | ||
* * default_publish_routing_key: Routing key to use when publishing, if none is specified on the message | ||
* * flags: Exchange flags (Default: AMQP_DURABLE) | ||
* * arguments: Extra arguments | ||
* * delay: | ||
|
@@ -86,7 +86,7 @@ class Connection | |
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000) | ||
* * prefetch_count: set channel prefetch count | ||
*/ | ||
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null) | ||
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queuesConfiguration, AmqpFactory $amqpFactory = null) | ||
{ | ||
$this->connectionConfiguration = array_replace_recursive([ | ||
'delay' => [ | ||
|
@@ -96,7 +96,7 @@ public function __construct(array $connectionConfiguration, array $exchangeConfi | |
], | ||
], $connectionConfiguration); | ||
$this->exchangeConfiguration = $exchangeConfiguration; | ||
$this->queueConfiguration = $queueConfiguration; | ||
$this->queuesConfiguration = $queuesConfiguration; | ||
$this->amqpFactory = $amqpFactory ?: new AmqpFactory(); | ||
} | ||
|
||
|
@@ -111,8 +111,10 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am | |
'host' => $parsedUrl['host'] ?? 'localhost', | ||
'port' => $parsedUrl['port'] ?? 5672, | ||
'vhost' => isset($pathParts[0]) ? urldecode($pathParts[0]) : '/', | ||
'queue' => [ | ||
'name' => $queueName = $pathParts[1] ?? 'messages', | ||
'queues' => [ | ||
[ | ||
'name' => $queueName = $pathParts[1] ?? 'messages', | ||
], | ||
], | ||
'exchange' => [ | ||
'name' => $queueName, | ||
|
@@ -134,14 +136,18 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am | |
} | ||
|
||
$exchangeOptions = $amqpOptions['exchange']; | ||
$queueOptions = $amqpOptions['queue']; | ||
unset($amqpOptions['queue'], $amqpOptions['exchange']); | ||
$queuesOptions = $amqpOptions['queues']; | ||
unset($amqpOptions['queues'], $amqpOptions['exchange']); | ||
|
||
if (\is_array($queueOptions['arguments'] ?? false)) { | ||
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']); | ||
} | ||
$queuesOptions = array_map(function (array $queueOptions) { | ||
if (\is_array($queuesOptions['arguments'] ?? false)) { | ||
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']); | ||
} | ||
|
||
return $queueOptions; | ||
}, $queuesOptions); | ||
|
||
return new self($amqpOptions, $exchangeOptions, $queueOptions, $amqpFactory); | ||
return new self($amqpOptions, $exchangeOptions, $queuesOptions, $amqpFactory); | ||
} | ||
|
||
private static function normalizeQueueArguments(array $arguments): array | ||
|
@@ -178,9 +184,11 @@ public function publish(string $body, array $headers = [], int $delay = 0, strin | |
$this->setup(); | ||
} | ||
|
||
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM; | ||
$attributes = $this->getAttributes($headers); | ||
$routingKey = $routingKey ?? $this->getExchangeRoutingKey(); | ||
// TODO - allow flag & attributes to be configured on the message | ||
$flags = []; | ||
$attributes = []; | ||
$attributes = array_merge_recursive($attributes, ['headers' => $headers]); | ||
$routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey(); | ||
|
||
$this->exchange()->publish($body, $routingKey, $flags, $attributes); | ||
} | ||
|
@@ -194,14 +202,16 @@ private function publishWithDelay(string $body, array $headers, int $delay, ?str | |
$this->setupDelay($delay, $exchangeRoutingKey); | ||
} | ||
|
||
// TODO - allow flag & attributes to be configured on the message | ||
$flags = []; | ||
$attributes = []; | ||
$attributes = array_merge_recursive($attributes, ['headers' => $headers]); | ||
$routingKey = $this->getRoutingKeyForDelay($delay); | ||
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM; | ||
$attributes = $this->getAttributes($headers); | ||
|
||
$this->getDelayExchange()->publish($body, $routingKey, $flags, $attributes); | ||
} | ||
|
||
private function setupDelay(int $delay, ?string $exchangeRoutingKey) | ||
private function setupDelay(int $delay, ?string $routingKey) | ||
{ | ||
if (!$this->channel()->isConnected()) { | ||
$this->clear(); | ||
|
@@ -210,7 +220,7 @@ private function setupDelay(int $delay, ?string $exchangeRoutingKey) | |
$exchange = $this->getDelayExchange(); | ||
$exchange->declareExchange(); | ||
|
||
$queue = $this->createDelayQueue($delay, $exchangeRoutingKey); | ||
$queue = $this->createDelayQueue($delay, $routingKey); | ||
$queue->declareQueue(); | ||
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay)); | ||
} | ||
|
@@ -235,7 +245,7 @@ private function getDelayExchange(): \AMQPExchange | |
* which is the original exchange, resulting on it being put back into | ||
* the original queue. | ||
*/ | ||
private function createDelayQueue(int $delay, ?string $exchangeRoutingKey) | ||
private function createDelayQueue(int $delay, ?string $routingKey) | ||
{ | ||
$delayConfiguration = $this->connectionConfiguration['delay']; | ||
|
||
|
@@ -246,10 +256,10 @@ private function createDelayQueue(int $delay, ?string $exchangeRoutingKey) | |
'x-dead-letter-exchange' => $this->exchange()->getName(), | ||
]); | ||
|
||
$exchangeRoutingKey = $exchangeRoutingKey ?? $this->getExchangeRoutingKey(); | ||
if (null !== $exchangeRoutingKey) { | ||
$routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey(); | ||
if (null !== $routingKey) { | ||
// after being released from to DLX, this routing key will be used | ||
$queue->setArgument('x-dead-letter-routing-key', $exchangeRoutingKey); | ||
$queue->setArgument('x-dead-letter-routing-key', $routingKey); | ||
} | ||
|
||
return $queue; | ||
|
@@ -261,18 +271,18 @@ private function getRoutingKeyForDelay(int $delay): string | |
} | ||
|
||
/** | ||
* Waits and gets a message from the configured queue. | ||
* Gets a message from the specified queue. | ||
* | ||
* @throws \AMQPException | ||
*/ | ||
public function get(): ?\AMQPEnvelope | ||
public function get(string $queueName): ?\AMQPEnvelope | ||
{ | ||
if ($this->shouldSetup()) { | ||
$this->setup(); | ||
} | ||
|
||
try { | ||
if (false !== $message = $this->queue()->get()) { | ||
if (false !== $message = $this->queue($queueName)->get()) { | ||
return $message; | ||
} | ||
} catch (\AMQPQueueException $e) { | ||
|
@@ -289,14 +299,14 @@ public function get(): ?\AMQPEnvelope | |
return null; | ||
} | ||
|
||
public function ack(\AMQPEnvelope $message): bool | ||
public function ack(\AMQPEnvelope $message, string $queueName): bool | ||
{ | ||
return $this->queue()->ack($message->getDeliveryTag()); | ||
return $this->queue($queueName)->ack($message->getDeliveryTag()); | ||
} | ||
|
||
public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM): bool | ||
public function nack(\AMQPEnvelope $message, string $queueName, int $flags = AMQP_NOPARAM): bool | ||
{ | ||
return $this->queue()->nack($message->getDeliveryTag(), $flags); | ||
return $this->queue($queueName)->nack($message->getDeliveryTag(), $flags); | ||
} | ||
|
||
public function setup(): void | ||
|
@@ -307,10 +317,25 @@ public function setup(): void | |
|
||
$this->exchange()->declareExchange(); | ||
|
||
$this->queue()->declareQueue(); | ||
$this->queue()->bind($this->exchange()->getName(), $this->queueConfiguration['routing_key'] ?? null); | ||
foreach ($this->queuesConfiguration as $queueName => $queueConfig) { | ||
$this->queue($queueName)->declareQueue(); | ||
foreach ($queueConfig['routing_keys'] ?? [] as $routingKey) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @weaverryan , don't forget to rename |
||
$this->queue($queueName)->bind($this->exchange()->getName(), $routingKey); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* @return string[] | ||
*/ | ||
public function getAllQueueNames(): array | ||
{ | ||
return array_keys($this->queuesConfiguration); | ||
} | ||
|
||
/** | ||
* @internal | ||
*/ | ||
public function channel(): \AMQPChannel | ||
{ | ||
if (null === $this->amqpChannel) { | ||
|
@@ -335,22 +360,29 @@ public function channel(): \AMQPChannel | |
return $this->amqpChannel; | ||
} | ||
|
||
public function queue(): \AMQPQueue | ||
/** | ||
* @internal | ||
*/ | ||
public function queue(string $queueName): \AMQPQueue | ||
{ | ||
if (null === $this->amqpQueue) { | ||
$this->amqpQueue = $this->amqpFactory->createQueue($this->channel()); | ||
$this->amqpQueue->setName($this->queueConfiguration['name']); | ||
$this->amqpQueue->setFlags($this->queueConfiguration['flags'] ?? AMQP_DURABLE); | ||
if (!isset($this->amqpQueues[$queueName])) { | ||
$queueConfig = $this->queuesConfiguration[$queueName]; | ||
|
||
if (isset($this->queueConfiguration['arguments'])) { | ||
$this->amqpQueue->setArguments($this->queueConfiguration['arguments']); | ||
$amqpQueue = $this->amqpFactory->createQueue($this->channel()); | ||
$amqpQueue->setName($queueConfig['name']); | ||
$amqpQueue->setFlags($queueConfig['flags'] ?? AMQP_DURABLE); | ||
|
||
if (isset($queueConfig['arguments'])) { | ||
$amqpQueue->setArguments($queueConfig['arguments']); | ||
} | ||
|
||
$this->amqpQueues[$queueName] = $amqpQueue; | ||
} | ||
|
||
return $this->amqpQueue; | ||
return $this->amqpQueues[$queueName]; | ||
} | ||
|
||
public function exchange(): \AMQPExchange | ||
private function exchange(): \AMQPExchange | ||
{ | ||
if (null === $this->amqpExchange) { | ||
$this->amqpExchange = $this->amqpFactory->createExchange($this->channel()); | ||
|
@@ -374,7 +406,7 @@ public function getConnectionConfiguration(): array | |
private function clear(): void | ||
{ | ||
$this->amqpChannel = null; | ||
$this->amqpQueue = null; | ||
$this->amqpQueues = []; | ||
$this->amqpExchange = null; | ||
} | ||
|
||
|
@@ -391,19 +423,8 @@ private function shouldSetup(): bool | |
return true; | ||
} | ||
|
||
private function getAttributes(array $headers): array | ||
private function getDefaultPublishRoutingKey(): ?string | ||
{ | ||
return array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]); | ||
} | ||
|
||
private function getExchangeRoutingKey(): ?string | ||
{ | ||
$routingKey = $this->exchangeConfiguration['routing_key'] ?? null; | ||
if (null === $routingKey && isset($this->queueConfiguration['routing_key'])) { | ||
$routingKey = $this->queueConfiguration['routing_key']; | ||
@trigger_error('Routing key from "queue" configuration is deprecated. Use "exchange" configuration instead.', E_USER_DEPRECATED); | ||
} | ||
|
||
return $routingKey; | ||
return $this->exchangeConfiguration['default_publish_routing_key'] ?? null; | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@weaverryan Just to confirm, as far as I understand, this line addresses the first point (AMQP configuration) I made here. Am I correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m not sure exactly which was your first point. But this part allows you to declare that you want messenger to create multiple queues on your behalf and create multiple binds for each queue. I’m pretty sure this was at least one of your issues :). This part is all about creating and binding of queues - these specific lines say nothing about the publishing process or what routing keys are used.