8000 [Messenger] AMQP configurable routing key & multiple queues by weaverryan · Pull Request #30770 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] AMQP configurable routing key & multiple queues #30770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
allowing multiple queues for AMQP
  • Loading branch information
weaverryan committed Mar 29, 2019
commit 34441730ec1b0a49585c8202b823b12212756c3d
1 change: 0 additions & 1 deletion UPGRADE-4.3.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ Messenger

* `Amqp` transport does not throw `\AMQPException` anymore, catch `TransportException` instead.
* Deprecated the `LoggingMiddleware` class, pass a logger to `SendMessageMiddleware` instead.
* Deprecated routing key from queue configuration (`queue[routing_key]` in the DSN), use exchange configuration instead (`exchange[routing_key]` in the DSN).

Routing
-------
Expand Down
1 change: 0 additions & 1 deletion UPGRADE-5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ Messenger
---------

* The `LoggingMiddleware` class has been removed, pass a logger to `SendMessageMiddleware` instead.
* Routing key from queue configuration has been removed. Use exchange configuration instead (`exchange[routing_key]` in DSN).

Monolog
-------
Expand Down
5 changes: 3 additions & 2 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ CHANGELOG
4.3.0
-----

* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
* Removed publishing with a `routing_key` option from queue configuration, for
AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead.
* Added optional parameter `prefetch_count` in connection configuration,
to setup channel prefetch count
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
Expand Down Expand Up @@ -53,8 +56,6 @@ CHANGELOG
and queues by default. Previously, this was done when in "debug" mode
only. Pass the `auto_setup` connection option to control this.
* Added a `SetupTransportsCommand` com 8000 mand to setup the transports
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
* Deprecated publishing with a routing key from queue configuration, use exchange configuration instead.

4.2.0
-----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,15 @@ public function receive(callable $handler): void
{
while (!$this->shouldStop) {
try {
$amqpEnvelope = $this->connection->get();
// TODO - update this after #30708 is merged
$amqpEnvelope = null;
foreach ($this->getAllQueueNames() as $queueName) {
$amqpEnvelope = $this->connection->get($queueName);

if (null !== $amqpEnvelope) {
break;
}
}
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
*/
final class AmqpRoutingKeyStamp implements StampInterface
{
/**
* @var string
*/
private $routingKey;

public function __construct(string $routingKey)
Expand Down
137 changes: 79 additions & 58 deletions src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class Connection

private $connectionConfiguration;
private $exchangeConfiguration;
private $queueConfiguration;
private $queuesConfiguration;
private $amqpFactory;

/**
Expand All @@ -51,7 +51,7 @@ class Connection
/**
* @var \AMQPQueue|null
*/
private $amqpQueue;
private $amqpQueues = [];

/**
* @var \AMQPExchange|null
Expand All @@ -68,14 +68,14 @@ class Connection
* * vhost: Virtual Host to use with the AMQP service
* * user: Username to use to connect the the AMQP service
* * password: Password to use the connect to the AMQP service
* * queue:
* * name: Name of the queue
* * routing_key: The routing key (if any) to use to push the messages to
* * queues[name]: An array of queues, keyed by the name
* * routing_keys: The routing keys (if any) to bind to this queue
* * flags: Queue flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * exchange:
* * name: Name of the exchange
* * type: Type of exchange (Default: fanout)
* * default_publish_routing_key: Routing key to use when publishing, if none is specified on the message
* * flags: Exchange flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * delay:
Expand All @@ -86,7 +86,7 @@ class Connection
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
* * prefetch_count: set channel prefetch count
*/
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null)
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queuesConfiguration, AmqpFactory $amqpFactory = null)
{
$this->connectionConfiguration = array_replace_recursive([
'delay' => [
Expand All @@ -96,7 +96,7 @@ public function __construct(array $connectionConfiguration, array $exchangeConfi
],
], $connectionConfiguration);
$this->exchangeConfiguration = $exchangeConfiguration;
$this->queueConfiguration = $queueConfiguration;
$this->queuesConfiguration = $queuesConfiguration;
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
}

Expand All @@ -111,8 +111,10 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am
'host' => $parsedUrl['host'] ?? 'localhost',
'port' => $parsedUrl['port'] ?? 5672,
'vhost' => isset($pathParts[0]) ? urldecode($pathParts[0]) : '/',
'queue' => [
'name' => $queueName = $pathParts[1] ?? 'messages',
'queues' => [
[
'name' => $queueName = $pathParts[1] ?? 'messages',
],
],
'exchange' => [
'name' => $queueName,
Expand All @@ -134,14 +136,18 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am
}

$exchangeOptions = $amqpOptions['exchange'];
$queueOptions = $amqpOptions['queue'];
unset($amqpOptions['queue'], $amqpOptions['exchange']);
$queuesOptions = $amqpOptions['queues'];
unset($amqpOptions['queues'], $amqpOptions['exchange']);

if (\is_array($queueOptions['arguments'] ?? false)) {
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
}
$queuesOptions = array_map(function (array $queueOptions) {
if (\is_array($queuesOptions['arguments'] ?? false)) {
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
}

return $queueOptions;
}, $queuesOptions);

return new self($amqpOptions, $exchangeOptions, $queueOptions, $amqpFactory);
return new self($amqpOptions, $exchangeOptions, $queuesOptions, $amqpFactory);
}

private static function normalizeQueueArguments(array $arguments): array
Expand Down Expand Up @@ -178,9 +184,11 @@ public function publish(string $body, array $headers = [], int $delay = 0, strin
$this->setup();
}

$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
$attributes = $this->getAttributes($headers);
$routingKey = $routingKey ?? $this->getExchangeRoutingKey();
// TODO - allow flag & attributes to be configured on the message
$flags = [];
$attributes = [];
$attributes = array_merge_recursive($attributes, ['headers' => $headers]);
$routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey();

$this->exchange()->publish($body, $routingKey, $flags, $attributes);
}
Expand All @@ -194,14 +202,16 @@ private function publishWithDelay(string $body, array $headers, int $delay, ?str
$this->setupDelay($delay, $exchangeRoutingKey);
}

// TODO - allow flag & attributes to be configured on the message
$flags = [];
$attributes = [];
$attributes = array_merge_recursive($attributes, ['headers' => $headers]);
$routingKey = $this->getRoutingKeyForDelay($delay);
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
$attributes = $this->getAttributes($headers);

$this->getDelayExchange()->publish($body, $routingKey, $flags, $attributes);
}

private function setupDelay(int $delay, ?string $exchangeRoutingKey)
private function setupDelay(int $delay, ?string $routingKey)
{
if (!$this->channel()->isConnected()) {
$this->clear();
Expand All @@ -210,7 +220,7 @@ private function setupDelay(int $delay, ?string $exchangeRoutingKey)
$exchange = $this->getDelayExchange();
$exchange->declareExchange();

$queue = $this->createDelayQueue($delay, $exchangeRoutingKey);
$queue = $this->createDelayQueue($delay, $routingKey);
$queue->declareQueue();
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay));
}
Expand All @@ -235,7 +245,7 @@ private function getDelayExchange(): \AMQPExchange
* which is the original exchange, resulting on it being put back into
* the original queue.
*/
private function createDelayQueue(int $delay, ?string $exchangeRoutingKey)
private function createDelayQueue(int $delay, ?string $routingKey)
{
$delayConfiguration = $this->connectionConfiguration['delay'];

Expand All @@ -246,10 +256,10 @@ private function createDelayQueue(int $delay, ?string $exchangeRoutingKey)
'x-dead-letter-exchange' => $this->exchange()->getName(),
]);

$exchangeRoutingKey = $exchangeRoutingKey ?? $this->getExchangeRoutingKey();
if (null !== $exchangeRoutingKey) {
$routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey();
if (null !== $routingKey) {
// after being released from to DLX, this routing key will be used
$queue->setArgument('x-dead-letter-routing-key', $exchangeRoutingKey);
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
}

return $queue;
Expand All @@ -261,18 +271,18 @@ private function getRoutingKeyForDelay(int $delay): string
}

/**
* Waits and gets a message from the configured queue.
* Gets a message from the specified queue.
*
* @throws \AMQPException
*/
public function get(): ?\AMQPEnvelope
public function get(string $queueName): ?\AMQPEnvelope
{
if ($this->shouldSetup()) {
$this->setup();
}

try {
if (false !== $message = $this->queue()->get()) {
if (false !== $message = $this->queue($queueName)->get()) {
return $message;
}
} catch (\AMQPQueueException $e) {
Expand All @@ -289,14 +299,14 @@ public function get(): ?\AMQPEnvelope
return null;
}

public function ack(\AMQPEnvelope $message): bool
public function ack(\AMQPEnvelope $message, string $queueName): bool
{
return $this->queue()->ack($message->getDeliveryTag());
return $this->queue($queueName)->ack($message->getDeliveryTag());
}

public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM): bool
public function nack(\AMQPEnvelope $message, string $queueName, int $flags = AMQP_NOPARAM): bool
{
return $this->queue()->nack($message->getDeliveryTag(), $flags);
return $this->queue($queueName)->nack($message->getDeliveryTag(), $flags);
}

public function setup(): void
Expand All @@ -307,10 +317,25 @@ public function setup(): void

$this->exchange()->declareExchange();

$this->queue()->declareQueue();
$this->queue()->bind($this->exchange()->getName(), $this->queueConfiguration['routing_key'] ?? null);
foreach ($this->queuesConfiguration as $queueName => $queueConfig) {
$this->queue($queueName)->declareQueue();
foreach ($queueConfig['routing_keys'] ?? [] as $routingKey) {
Copy link
@bentcoder bentcoder Mar 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@weaverryan Just to confirm, as far as I understand, this line addresses the first point (AMQP configuration) I made here. Am I correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not sure exactly which was your first point. But this part allows you to declare that you want messenger to create multiple queues on your behalf and create multiple binds for each queue. I’m pretty sure this was at least one of your issues :). This part is all about creating and binding of queues - these specific lines say nothing about the publishing process or what routing keys are used.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@weaverryan , don't forget to rename routing_keys to binding_keys for this binding part (also in constructor doc)

10000
$this->queue($queueName)->bind($this->exchange()->getName(), $routingKey);
}
}
}

/**
* @return string[]
*/
public function getAllQueueNames(): array
{
return array_keys($this->queuesConfiguration);
}

/**
* @internal
*/
public function channel(): \AMQPChannel
{
if (null === $this->amqpChannel) {
Expand All @@ -335,22 +360,29 @@ public function channel(): \AMQPChannel
return $this->amqpChannel;
}

public function queue(): \AMQPQueue
/**
* @internal
*/
public function queue(string $queueName): \AMQPQueue
{
if (null === $this->amqpQueue) {
$this->amqpQueue = $this->amqpFactory->createQueue($this->channel());
$this->amqpQueue->setName($this->queueConfiguration['name']);
$this->amqpQueue->setFlags($this->queueConfiguration['flags'] ?? AMQP_DURABLE);
if (!isset($this->amqpQueues[$queueName])) {
$queueConfig = $this->queuesConfiguration[$queueName];

if (isset($this->queueConfiguration['arguments'])) {
$this->amqpQueue->setArguments($this->queueConfiguration['arguments']);
$amqpQueue = $this->amqpFactory->createQueue($this->channel());
$amqpQueue->setName($queueConfig['name']);
$amqpQueue->setFlags($queueConfig['flags'] ?? AMQP_DURABLE);

if (isset($queueConfig['arguments'])) {
$amqpQueue->setArguments($queueConfig['arguments']);
}

$this->amqpQueues[$queueName] = $amqpQueue;
}

return $this->amqpQueue;
return $this->amqpQueues[$queueName];
}

public function exchange(): \AMQPExchange
private function exchange(): \AMQPExchange
{
if (null === $this->amqpExchange) {
$this->amqpExchange = $this->amqpFactory->createExchange($this->channel());
Expand All @@ -374,7 +406,7 @@ public function getConnectionConfiguration(): array
private function clear(): void
{
$this->amqpChannel = null;
$this->amqpQueue = null;
$this->amqpQueues = [];
$this->amqpExchange = null;
}

Expand All @@ -391,19 +423,8 @@ private function shouldSetup(): bool
return true;
}

private function getAttributes(array $headers): array
private function getDefaultPublishRoutingKey(): ?string
{
return array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]);
}

private function getExchangeRoutingKey(): ?string
{
$routingKey = $this->exchangeConfiguration['routing_key'] ?? null;
if (null === $routingKey && isset($this->queueConfiguration['routing_key'])) {
$routingKey = $this->queueConfiguration['routing_key'];
@trigger_error('Routing key from "queue" configuration is deprecated. Use "exchange" configuration instead.', E_USER_DEPRECATED);
}

return $routingKey;
return $this->exchangeConfiguration['default_publish_routing_key'] ?? null;
}
}
0