From 8fddd02c114500744c70a73eab2d587412f9a1b9 Mon Sep 17 00:00:00 2001 From: Guillaume Gammelin Date: Mon, 28 Jan 2019 16:06:43 +0100 Subject: [PATCH 1/2] [messenger] Adds a stamp to provide a routing key on message publishing. --- UPGRADE-4.3.md | 1 + UPGRADE-5.0.md | 1 + src/Symfony/Component/Messenger/CHANGELOG.md | 3 +- .../AmqpExt/AmqpRoutingKeyStampTest.php | 24 +++++ .../Transport/AmqpExt/AmqpSenderTest.php | 16 ++++ .../Transport/AmqpExt/ConnectionTest.php | 88 +++++++++++++++++-- .../Transport/AmqpExt/AmqpRoutingKeyStamp.php | 37 ++++++++ .../Transport/AmqpExt/AmqpSender.php | 6 +- .../Transport/AmqpExt/Connection.php | 33 ++++--- 9 files changed, 192 insertions(+), 17 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpRoutingKeyStampTest.php create mode 100644 src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpRoutingKeyStamp.php diff --git a/UPGRADE-4.3.md b/UPGRADE-4.3.md index e60f75789dea6..887c385fd3867 100644 --- a/UPGRADE-4.3.md +++ b/UPGRADE-4.3.md @@ -91,6 +91,7 @@ Messenger * `Amqp` transport does not throw `\AMQPException` anymore, catch `TransportException` instead. * Deprecated the `LoggingMiddleware` class, pass a logger to `SendMessageMiddleware` instead. + * Deprecated routing key from queue configuration (`queue[routing_key]` in the DSN), use exchange configuration instead (`exchange[routing_key]` in the DSN). Routing ------- diff --git a/UPGRADE-5.0.md b/UPGRADE-5.0.md index ec6f60951ec61..eb300563d2151 100644 --- a/UPGRADE-5.0.md +++ b/UPGRADE-5.0.md @@ -236,6 +236,7 @@ Messenger --------- * The `LoggingMiddleware` class has been removed, pass a logger to `SendMessageMiddleware` instead. + * Routing key from queue configuration has been removed. Use exchange configuration instead (`exchange[routing_key]` in DSN). Monolog ------- diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 9638de4e49092..85493e0c93d22 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -52,8 +52,9 @@ CHANGELOG * [BC BREAK] The Amqp Transport now automatically sets up the exchanges and queues by default. Previously, this was done when in "debug" mode only. Pass the `auto_setup` connection option to control this. - * Added a `SetupTransportsCommand` command to setup the transports + * Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing. + * Deprecated publishing with a routing key from queue configuration, use exchange configuration instead. 4.2.0 ----- diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpRoutingKeyStampTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpRoutingKeyStampTest.php new file mode 100644 index 0000000000000..895e41b2620e9 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpRoutingKeyStampTest.php @@ -0,0 +1,24 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp; + +class AmqpRoutingKeyStampTest extends TestCase +{ + public function testStamp() + { + $stamp = new AmqpRoutingKeyStamp('routing_key'); + $this->assertSame('routing_key', $stamp->getRoutingKey()); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php index 14fee16ca6589..178d86a516860 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php @@ -14,6 +14,7 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender; use Symfony\Component\Messenger\Transport\AmqpExt\Connection; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -38,6 +39,21 @@ public function testItSendsTheEncodedMessage() $sender->send($envelope); } + public function testItSendsTheEncodedMessageUsingARoutingKey() + { + $envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpRoutingKeyStamp('rk')); + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; + + $serializer = $this->createMock(SerializerInterface::class); + $serializer->method('encode')->with($envelope)->willReturn($encoded); + + $connection = $this->createMock(Connection::class); + $connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, 'rk'); + + $sender = new AmqpSender($connection, $serializer); + $sender->send($envelope); + } + public function testItSendsTheEncodedMessageWithoutHeaders() { $envelope = new Envelope(new DummyMessage('Oy')); diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php index 100aeb78e882d..46e4e39844208 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php @@ -207,10 +207,11 @@ public function testItSetupsTheConnectionByDefault() $amqpExchange->method('getName')->willReturn('exchange_name'); $amqpExchange->expects($this->once())->method('declareExchange'); + $amqpExchange->expects($this->once())->method('publish')->with('body', 'exchange_key', AMQP_NOPARAM, ['headers' => []]); $amqpQueue->expects($this->once())->method('declareQueue'); - $amqpQueue->expects($this->once())->method('bind')->with('exchange_name', 'my_key'); + $amqpQueue->expects($this->once())->method('bind')->with('exchange_name', 'queue_key'); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', [], $factory); + $connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=exchange_key&queue[routing_key]=queue_key', [], $factory); $connection->publish('body'); } @@ -228,13 +229,13 @@ public function testItCanDisableTheSetup() $amqpQueue->expects($this->never())->method('declareQueue'); $amqpQueue->expects($this->never())->method('bind'); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto_setup' => 'false'], $factory); + $connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key&queue[routing_key]=my_key', ['auto_setup' => 'false'], $factory); $connection->publish('body'); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto_setup' => false], $factory); + $connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key&queue[routing_key]=my_key', ['auto_setup' => false], $factory); $connection->publish('body'); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto_setup=false', [], $factory); + $connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key&queue[routing_key]=my_key&auto_setup=false', [], $factory); $connection->publish('body'); } @@ -375,6 +376,83 @@ public function testObfuscatePasswordInDsn() $connection = Connection::fromDsn('amqp://user:secretpassword@localhost/%2f/messages', [], $factory); $connection->channel(); } + + public function testItCanPublishWithTheDefaultQueueRoutingKey() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $amqpQueue = $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->expects($this->once())->method('publish')->with('body', 'my_key'); + + $connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key', [], $factory); + $connection->publish('body'); + } + + public function testItCanPublishWithASuppliedRoutingKey() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $amqpQueue = $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->expects($this->once())->method('publish')->with('body', 'supplied_key'); + + $connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key', [], $factory); + $connection->publish('body', [], 0, 'supplied_key'); + } + + public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument() + { + $amqpConnection = $this->createMock(\AMQPConnection::class); + $amqpChannel = $this->createMock(\AMQPChannel::class); + $delayQueue = $this->createMock(\AMQPQueue::class); + + $factory = $this->createMock(AmqpFactory::class); + $factory->method('createConnection')->willReturn($amqpConnection); + $factory->method('createChannel')->willReturn($amqpChannel); + $factory->method('createQueue')->willReturn($delayQueue); + $factory->method('createExchange')->will($this->onConsecutiveCalls( + $delayExchange = $this->createMock(\AMQPExchange::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + )); + + $amqpExchange->expects($this->once())->method('setName')->with('messages'); + $amqpExchange->method('getName')->willReturn('messages'); + + $delayExchange->expects($this->once())->method('setName')->with('delay'); + $delayExchange->expects($this->once())->method('declareExchange'); + $delayExchange->method('getName')->willReturn('delay'); + + $connectionOptions = [ + 'retry' => [ + 'dead_routing_key' => 'my_dead_routing_key', + ], + ]; + + $connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory); + + $delayQueue->expects($this->once())->method('setName')->with('delay_queue_120000'); + $delayQueue->expects($this->once())->method('setArguments')->with([ + 'x-message-ttl' => 120000, + 'x-dead-letter-exchange' => 'messages', + ]); + $delayQueue->expects($this->once())->method('setArgument')->with( + 'x-dead-letter-routing-key', + 'routing_key' + ); + + $delayQueue->expects($this->once())->method('declareQueue'); + $delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_120000'); + + $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_120000', AMQP_NOPARAM, ['headers' => []]); + $connection->publish('{}', [], 120000, 'routing_key'); + } } class TestAmqpFactory extends AmqpFactory diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpRoutingKeyStamp.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpRoutingKeyStamp.php new file mode 100644 index 0000000000000..c306c478ede27 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpRoutingKeyStamp.php @@ -0,0 +1,37 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\AmqpExt; + +use Symfony\Component\Messenger\Stamp\StampInterface; + +/** + * @author Guillaume Gammelin + * + * @experimental in 4.3 + */ +final class AmqpRoutingKeyStamp implements StampInterface +{ + /** + * @var string + */ + private $routingKey; + + public function __construct(string $routingKey) + { + $this->routingKey = $routingKey; + } + + public function getRoutingKey(): string + { + return $this->routingKey; + } +} diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php index 021a6bfaf8b51..d53ecb3b8979e 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php @@ -51,7 +51,11 @@ public function send(Envelope $envelope): Envelope } try { - $this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay); + /** @var $routingKeyStamp AmqpRoutingKeyStamp */ + $routingKeyStamp = $envelope->last(AmqpRoutingKeyStamp::class); + $routingKey = $routingKeyStamp ? $routingKeyStamp->getRoutingKey() : null; + + $this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay, $routingKey); } catch (\AMQPException $e) { throw new TransportException($e->getMessage(), 0, $e); } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index 4dad0735b93e9..df51e6402ba58 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -166,10 +166,10 @@ private static function normalizeQueueArguments(array $arguments): array * * @throws \AMQPException */ - public function publish(string $body, array $headers = [], int $delay = 0): void + public function publish(string $body, array $headers = [], int $delay = 0, string $routingKey = null): void { if (0 !== $delay) { - $this->publishWithDelay($body, $headers, $delay); + $this->publishWithDelay($body, $headers, $delay, $routingKey); return; } @@ -180,17 +180,18 @@ public function publish(string $body, array $headers = [], int $delay = 0): void $flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM; $attributes = $this->getAttributes($headers); + $routingKey = $routingKey ?? $this->getExchangeRoutingKey(); - $this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes); + $this->exchange()->publish($body, $routingKey, $flags, $attributes); } /** * @throws \AMQPException */ - private function publishWithDelay(string $body, array $headers = [], int $delay) + private function publishWithDelay(string $body, array $headers, int $delay, ?string $exchangeRoutingKey) { if ($this->shouldSetup()) { - $this->setupDelay($delay); + $this->setupDelay($delay, $exchangeRoutingKey); } $routingKey = $this->getRoutingKeyForDelay($delay); @@ -200,7 +201,7 @@ private function publishWithDelay(string $body, array $headers = [], int $delay) $this->getDelayExchange()->publish($body, $routingKey, $flags, $attributes); } - private function setupDelay(int $delay) + private function setupDelay(int $delay, ?string $exchangeRoutingKey) { if (!$this->channel()->isConnected()) { $this->clear(); @@ -209,7 +210,7 @@ private function setupDelay(int $delay) $exchange = $this->getDelayExchange(); $exchange->declareExchange(); - $queue = $this->createDelayQueue($delay); + $queue = $this->createDelayQueue($delay, $exchangeRoutingKey); $queue->declareQueue(); $queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay)); } @@ -234,7 +235,7 @@ private function getDelayExchange(): \AMQPExchange * which is the original exchange, resulting on it being put back into * the original queue. */ - private function createDelayQueue(int $delay) + private function createDelayQueue(int $delay, ?string $exchangeRoutingKey) { $delayConfiguration = $this->connectionConfiguration['delay']; @@ -245,9 +246,10 @@ private function createDelayQueue(int $delay) 'x-dead-letter-exchange' => $this->exchange()->getName(), ]); - if (isset($this->queueConfiguration['routing_key'])) { + $exchangeRoutingKey = $exchangeRoutingKey ?? $this->getExchangeRoutingKey(); + if (null !== $exchangeRoutingKey) { // after being released from to DLX, this routing key will be used - $queue->setArgument('x-dead-letter-routing-key', $this->queueConfiguration['routing_key']); + $queue->setArgument('x-dead-letter-routing-key', $exchangeRoutingKey); } return $queue; @@ -393,4 +395,15 @@ private function getAttributes(array $headers): array { return array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]); } + + private function getExchangeRoutingKey(): ?string + { + $routingKey = $this->exchangeConfiguration['routing_key'] ?? null; + if (null === $routingKey && isset($this->queueConfiguration['routing_key'])) { + $routingKey = $this->queueConfiguration['routing_key']; + @trigger_error('Routing key from "queue" configuration is deprecated. Use "exchange" configuration instead.', E_USER_DEPRECATED); + } + + return $routingKey; + } } From 34441730ec1b0a49585c8202b823b12212756c3d Mon Sep 17 00:00:00 2001 From: Ryan Weaver Date: Fri, 29 Mar 2019 13:04:50 -0400 Subject: [PATCH 2/2] allowing multiple queues for AMQP --- UPGRADE-4.3.md | 1 - UPGRADE-5.0.md | 1 - src/Symfony/Component/Messenger/CHANGELOG.md | 5 +- .../Transport/AmqpExt/AmqpReceiver.php | 10 +- .../Transport/AmqpExt/AmqpRoutingKeyStamp.php | 3 - .../Transport/AmqpExt/Connection.php | 137 ++++++++++-------- 6 files changed, 91 insertions(+), 66 deletions(-) diff --git a/UPGRADE-4.3.md b/UPGRADE-4.3.md index 887c385fd3867..e60f75789dea6 100644 --- a/UPGRADE-4.3.md +++ b/UPGRADE-4.3.md @@ -91,7 +91,6 @@ Messenger * `Amqp` transport does not throw `\AMQPException` anymore, catch `TransportException` instead. * Deprecated the `LoggingMiddleware` class, pass a logger to `SendMessageMiddleware` instead. - * Deprecated routing key from queue configuration (`queue[routing_key]` in the DSN), use exchange configuration instead (`exchange[routing_key]` in the DSN). Routing ------- diff --git a/UPGRADE-5.0.md b/UPGRADE-5.0.md index eb300563d2151..ec6f60951ec61 100644 --- a/UPGRADE-5.0.md +++ b/UPGRADE-5.0.md @@ -236,7 +236,6 @@ Messenger --------- * The `LoggingMiddleware` class has been removed, pass a logger to `SendMessageMiddleware` instead. - * Routing key from queue configuration has been removed. Use exchange configuration instead (`exchange[routing_key]` in DSN). Monolog ------- diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 85493e0c93d22..ea0a9ab367fa1 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -4,6 +4,9 @@ CHANGELOG 4.3.0 ----- + * Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing. + * Removed publishing with a `routing_key` option from queue configuration, for + AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead. * Added optional parameter `prefetch_count` in connection configuration, to setup channel prefetch count * New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware` @@ -53,8 +56,6 @@ CHANGELOG and queues by default. Previously, this was done when in "debug" mode only. Pass the `auto_setup` connection option to control this. * Added a `SetupTransportsCommand` command to setup the transports - * Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing. - * Deprecated publishing with a routing key from queue configuration, use exchange configuration instead. 4.2.0 ----- diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php index 93afbaff5d8b8..7eeadef26a32d 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php @@ -45,7 +45,15 @@ public function receive(callable $handler): void { while (!$this->shouldStop) { try { - $amqpEnvelope = $this->connection->get(); + // TODO - update this after #30708 is merged + $amqpEnvelope = null; + foreach ($this->getAllQueueNames() as $queueName) { + $amqpEnvelope = $this->connection->get($queueName); + + if (null !== $amqpEnvelope) { + break; + } + } } catch (\AMQPException $exception) { throw new TransportException($exception->getMessage(), 0, $exception); } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpRoutingKeyStamp.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpRoutingKeyStamp.php index c306c478ede27..b9f2aa2d81765 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpRoutingKeyStamp.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpRoutingKeyStamp.php @@ -20,9 +20,6 @@ */ final class AmqpRoutingKeyStamp implements StampInterface { - /** - * @var string - */ private $routingKey; public function __construct(string $routingKey) diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index df51e6402ba58..f25453315644e 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -35,7 +35,7 @@ class Connection private $connectionConfiguration; private $exchangeConfiguration; - private $queueConfiguration; + private $queuesConfiguration; private $amqpFactory; /** @@ -51,7 +51,7 @@ class Connection /** * @var \AMQPQueue|null */ - private $amqpQueue; + private $amqpQueues = []; /** * @var \AMQPExchange|null @@ -68,14 +68,14 @@ class Connection * * vhost: Virtual Host to use with the AMQP service * * user: Username to use to connect the the AMQP service * * password: Password to use the connect to the AMQP service - * * queue: - * * name: Name of the queue - * * routing_key: The routing key (if any) to use to push the messages to + * * queues[name]: An array of queues, keyed by the name + * * routing_keys: The routing keys (if any) to bind to this queue * * flags: Queue flags (Default: AMQP_DURABLE) * * arguments: Extra arguments * * exchange: * * name: Name of the exchange * * type: Type of exchange (Default: fanout) + * * default_publish_routing_key: Routing key to use when publishing, if none is specified on the message * * flags: Exchange flags (Default: AMQP_DURABLE) * * arguments: Extra arguments * * delay: @@ -86,7 +86,7 @@ class Connection * * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000) * * prefetch_count: set channel prefetch count */ - public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null) + public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queuesConfiguration, AmqpFactory $amqpFactory = null) { $this->connectionConfiguration = array_replace_recursive([ 'delay' => [ @@ -96,7 +96,7 @@ public function __construct(array $connectionConfiguration, array $exchangeConfi ], ], $connectionConfiguration); $this->exchangeConfiguration = $exchangeConfiguration; - $this->queueConfiguration = $queueConfiguration; + $this->queuesConfiguration = $queuesConfiguration; $this->amqpFactory = $amqpFactory ?: new AmqpFactory(); } @@ -111,8 +111,10 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am 'host' => $parsedUrl['host'] ?? 'localhost', 'port' => $parsedUrl['port'] ?? 5672, 'vhost' => isset($pathParts[0]) ? urldecode($pathParts[0]) : '/', - 'queue' => [ - 'name' => $queueName = $pathParts[1] ?? 'messages', + 'queues' => [ + [ + 'name' => $queueName = $pathParts[1] ?? 'messages', + ], ], 'exchange' => [ 'name' => $queueName, @@ -134,14 +136,18 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am } $exchangeOptions = $amqpOptions['exchange']; - $queueOptions = $amqpOptions['queue']; - unset($amqpOptions['queue'], $amqpOptions['exchange']); + $queuesOptions = $amqpOptions['queues']; + unset($amqpOptions['queues'], $amqpOptions['exchange']); - if (\is_array($queueOptions['arguments'] ?? false)) { - $queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']); - } + $queuesOptions = array_map(function (array $queueOptions) { + if (\is_array($queuesOptions['arguments'] ?? false)) { + $queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']); + } + + return $queueOptions; + }, $queuesOptions); - return new self($amqpOptions, $exchangeOptions, $queueOptions, $amqpFactory); + return new self($amqpOptions, $exchangeOptions, $queuesOptions, $amqpFactory); } private static function normalizeQueueArguments(array $arguments): array @@ -178,9 +184,11 @@ public function publish(string $body, array $headers = [], int $delay = 0, strin $this->setup(); } - $flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM; - $attributes = $this->getAttributes($headers); - $routingKey = $routingKey ?? $this->getExchangeRoutingKey(); + // TODO - allow flag & attributes to be configured on the message + $flags = []; + $attributes = []; + $attributes = array_merge_recursive($attributes, ['headers' => $headers]); + $routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey(); $this->exchange()->publish($body, $routingKey, $flags, $attributes); } @@ -194,14 +202,16 @@ private function publishWithDelay(string $body, array $headers, int $delay, ?str $this->setupDelay($delay, $exchangeRoutingKey); } + // TODO - allow flag & attributes to be configured on the message + $flags = []; + $attributes = []; + $attributes = array_merge_recursive($attributes, ['headers' => $headers]); $routingKey = $this->getRoutingKeyForDelay($delay); - $flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM; - $attributes = $this->getAttributes($headers); $this->getDelayExchange()->publish($body, $routingKey, $flags, $attributes); } - private function setupDelay(int $delay, ?string $exchangeRoutingKey) + private function setupDelay(int $delay, ?string $routingKey) { if (!$this->channel()->isConnected()) { $this->clear(); @@ -210,7 +220,7 @@ private function setupDelay(int $delay, ?string $exchangeRoutingKey) $exchange = $this->getDelayExchange(); $exchange->declareExchange(); - $queue = $this->createDelayQueue($delay, $exchangeRoutingKey); + $queue = $this->createDelayQueue($delay, $routingKey); $queue->declareQueue(); $queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay)); } @@ -235,7 +245,7 @@ private function getDelayExchange(): \AMQPExchange * which is the original exchange, resulting on it being put back into * the original queue. */ - private function createDelayQueue(int $delay, ?string $exchangeRoutingKey) + private function createDelayQueue(int $delay, ?string $routingKey) { $delayConfiguration = $this->connectionConfiguration['delay']; @@ -246,10 +256,10 @@ private function createDelayQueue(int $delay, ?string $exchangeRoutingKey) 'x-dead-letter-exchange' => $this->exchange()->getName(), ]); - $exchangeRoutingKey = $exchangeRoutingKey ?? $this->getExchangeRoutingKey(); - if (null !== $exchangeRoutingKey) { + $routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey(); + if (null !== $routingKey) { // after being released from to DLX, this routing key will be used - $queue->setArgument('x-dead-letter-routing-key', $exchangeRoutingKey); + $queue->setArgument('x-dead-letter-routing-key', $routingKey); } return $queue; @@ -261,18 +271,18 @@ private function getRoutingKeyForDelay(int $delay): string } /** - * Waits and gets a message from the configured queue. + * Gets a message from the specified queue. * * @throws \AMQPException */ - public function get(): ?\AMQPEnvelope + public function get(string $queueName): ?\AMQPEnvelope { if ($this->shouldSetup()) { $this->setup(); } try { - if (false !== $message = $this->queue()->get()) { + if (false !== $message = $this->queue($queueName)->get()) { return $message; } } catch (\AMQPQueueException $e) { @@ -289,14 +299,14 @@ public function get(): ?\AMQPEnvelope return null; } - public function ack(\AMQPEnvelope $message): bool + public function ack(\AMQPEnvelope $message, string $queueName): bool { - return $this->queue()->ack($message->getDeliveryTag()); + return $this->queue($queueName)->ack($message->getDeliveryTag()); } - public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM): bool + public function nack(\AMQPEnvelope $message, string $queueName, int $flags = AMQP_NOPARAM): bool { - return $this->queue()->nack($message->getDeliveryTag(), $flags); + return $this->queue($queueName)->nack($message->getDeliveryTag(), $flags); } public function setup(): void @@ -307,10 +317,25 @@ public function setup(): void $this->exchange()->declareExchange(); - $this->queue()->declareQueue(); - $this->queue()->bind($this->exchange()->getName(), $this->queueConfiguration['routing_key'] ?? null); + foreach ($this->queuesConfiguration as $queueName => $queueConfig) { + $this->queue($queueName)->declareQueue(); + foreach ($queueConfig['routing_keys'] ?? [] as $routingKey) { + $this->queue($queueName)->bind($this->exchange()->getName(), $routingKey); + } + } } + /** + * @return string[] + */ + public function getAllQueueNames(): array + { + return array_keys($this->queuesConfiguration); + } + + /** + * @internal + */ public function channel(): \AMQPChannel { if (null === $this->amqpChannel) { @@ -335,22 +360,29 @@ public function channel(): \AMQPChannel return $this->amqpChannel; } - public function queue(): \AMQPQueue + /** + * @internal + */ + public function queue(string $queueName): \AMQPQueue { - if (null === $this->amqpQueue) { - $this->amqpQueue = $this->amqpFactory->createQueue($this->channel()); - $this->amqpQueue->setName($this->queueConfiguration['name']); - $this->amqpQueue->setFlags($this->queueConfiguration['flags'] ?? AMQP_DURABLE); + if (!isset($this->amqpQueues[$queueName])) { + $queueConfig = $this->queuesConfiguration[$queueName]; - if (isset($this->queueConfiguration['arguments'])) { - $this->amqpQueue->setArguments($this->queueConfiguration['arguments']); + $amqpQueue = $this->amqpFactory->createQueue($this->channel()); + $amqpQueue->setName($queueConfig['name']); + $amqpQueue->setFlags($queueConfig['flags'] ?? AMQP_DURABLE); + + if (isset($queueConfig['arguments'])) { + $amqpQueue->setArguments($queueConfig['arguments']); } + + $this->amqpQueues[$queueName] = $amqpQueue; } - return $this->amqpQueue; + return $this->amqpQueues[$queueName]; } - public function exchange(): \AMQPExchange + private function exchange(): \AMQPExchange { if (null === $this->amqpExchange) { $this->amqpExchange = $this->amqpFactory->createExchange($this->channel()); @@ -374,7 +406,7 @@ public function getConnectionConfiguration(): array private function clear(): void { $this->amqpChannel = null; - $this->amqpQueue = null; + $this->amqpQueues = []; $this->amqpExchange = null; } @@ -391,19 +423,8 @@ private function shouldSetup(): bool return true; } - private function getAttributes(array $headers): array + private function getDefaultPublishRoutingKey(): ?string { - return array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]); - } - - private function getExchangeRoutingKey(): ?string - { - $routingKey = $this->exchangeConfiguration['routing_key'] ?? null; - if (null === $routingKey && isset($this->queueConfiguration['routing_key'])) { - $routingKey = $this->queueConfiguration['routing_key']; - @trigger_error('Routing key from "queue" configuration is deprecated. Use "exchange" configuration instead.', E_USER_DEPRECATED); - } - - return $routingKey; + return $this->exchangeConfiguration['default_publish_routing_key'] ?? null; } }