8000 feature #54275 [Messenger] [Amqp] Add default exchange support (ilyac… · symfony/symfony@23e2230 · GitHub
[go: up one dir, main page]

Skip to content

Commit 23e2230

Browse files
committed
feature #54275 [Messenger] [Amqp] Add default exchange support (ilyachase)
This PR was squashed before being merged into the 7.3 branch. Discussion ---------- [Messenger] [Amqp] Add default exchange support | Q | A | ------------- | --- | Branch? | 7.2 | Bug fix? | no | New feature? | yes | Deprecations? | no | Issues | Fix #45784 | License | MIT Usage: ```yaml transports: my_queue: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: exchange: # Indicates that the default Exchange is to be used name: '' default_publish_routing_key: my_queue queues: my_queue: ~ ``` It is not a breaking change, because currently using `name: ''` leads to the `Could not declare exchange. Exchanges must have a name` error. This PR allows using the default exchange instead. It's also possible to add more queues and then use them directly with a stamp: ``` queues: foo: ~ bar: ~ ``` ``` $messageBus->dispatch(new MyMessage(), [new AmqpStamp('bar')]); ``` Commits ------- 2f60031 [Messenger] [Amqp] Add default exchange support
2 parents 8370ad6 + 2f60031 commit 23e2230

File tree

4 files changed

+101
-9
lines changed

4 files changed

+101
-9
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
CHANGELOG
22
=========
33

4+
7.3
5+
---
6+
7+
* Add default exchange support
8+
49
7.1
510
---
611

7-
* Implement the `CloseableTransportInterface` to allow closing the AMQP connection
12+
* Implement the `CloseableTransportInterface` to allow closing the AMQP connection
813
* Add option `delay[arguments]` in the transport definition
914

1015
6.0

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,36 @@ public function testItSendsAndReceivesMessages()
7575
$this->assertSame([], iterator_to_array($receiver->get()));
7676
}
7777

78+
public function testItSendsAndReceivesMessagesThroughDefaultExchange()
79+
{
80+
$serializer = $this->createSerializer();
81+
82+
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), ['exchange' => ['name' => '']]);
83+
$connection->setup();
84+
$connection->purgeQueues();
85+
86+
$sender = new AmqpSender($connection, $serializer);
87+
$receiver = new AmqpReceiver($connection, $serializer);
88+
89+
$sender->send($first = new Envelope(new DummyMessage('First'), [new AmqpStamp('messages')]));
90+
$sender->send($second = new Envelope(new DummyMessage('Second'), [new AmqpStamp('messages')]));
91+
92+
$envelopes = iterator_to_array($receiver->get());
93+
$this->assertCount(1, $envelopes);
94+
/** @var Envelope $envelope */
95+
$envelope = $envelopes[0];
96+
$this->assertEquals($first->getMessage(), $envelope->getMessage());
97+
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
98+
99+
$envelopes = iterator_to_array($receiver->get());
100+
$this->assertCount(1, $envelopes);
101+
/** @var Envelope $envelope */
102+
$envelope = $envelopes[0];
103+
$this->assertEquals($second->getMessage(), $envelope->getMessage());
104+
105+
$this->assertEmpty(iterator_to_array($receiver->get()));
106+
}
107+
78108
public function testRetryAndDelay()
79109
{
80110
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,55 @@ private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, stri
875875

876876
return Connection::fromDsn('amqp://localhost', [], $factory);
877877
}
878+
879+
public function testGettingDefaultExchange()
880+
{
881+
$factory = $this->createMock(AmqpFactory::class);
882+
883+
$amqpExchange = $this->createMock(\AMQPExchange::class);
884+
$amqpExchange->expects($this->once())->method('setName')->with('');
885+
$amqpExchange->expects($this->once())->method('setType')->with(\AMQP_EX_TYPE_DIRECT);
886+
$amqpExchange->expects($this->never())->method('setFlags');
887+
$amqpExchange->expects($this->never())->method('setArguments');
888+
889+
$factory->expects($this->once())->method('createExchange')->willReturn($amqpExchange);
890+
891+
$connection = new Connection([
892+
'host' => 'localhost',
893+
'port' => 5672,
894+
'vhost' => '/',
895+
], [
896+
'name' => '',
897+
], [
898+
'' => [],
899+
], $factory);
900+
901+
$connection->exchange();
902+
}
903+
904+
public function testBindIsNotCalledWhenPublishingInDefaultExchange()
905+
{
906+
$factory = $this->createMock(AmqpFactory::class);
907+
908+
$amqpExchange = $this->createMock(\AMQPExchange::class);
909+
$amqpExchange->expects($this->never())->method('declareExchange');
910+
911+
$factory->expects($this->once())->method('createExchange')->willReturn($amqpExchange);
912+
$factory->expects($this->once())->method('createQueue')->willReturn($queue = $this->createMock(\AMQPQueue::class));
913+
$queue->expects($this->never())->method('bind');
914+
915+
$connection = new Connection([
916+
'host' => 'localhost',
917+
'port' => 5672,
918+
'vhost' => '/',
919+
], [
920+
'name' => '',
921+
], [
922+
'' => [],
923+
], $factory);
924+
925+
$connection->publish('body');
926+
}
878927
}
879928

880929
class TestAmqpFactory extends AmqpFactory

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public function __construct(
134134
* * flags: Queue flags (Default: AMQP_DURABLE)
135135
* * arguments: Extra arguments
136136
* * exchange:
137-
* * name: Name of the exchange
137+
* * name: Name of the exchange. An empty string (name: '') can be used to use the default exchange
138138
* * type: Type of exchange (Default: fanout)
139139
* * default_publish_routing_key: Routing key to use when publishing, if none is specified on the message
140140
* * flags: Exchange flags (Default: AMQP_DURABLE)
@@ -454,12 +454,17 @@ public function setup(): void
454454

455455
private function setupExchangeAndQueues(): void
456456
{
457-
$this->exchange()->declareExchange();
457+
$exchange = $this->exchange();
458+
if ('' !== $this->exchangeOptions['name']) {
459+
$exchange->declareExchange();
460+
}
458461

459462
foreach ($this->queuesOptions as $queueName => $queueConfig) {
460463
$this->queue($queueName)->declareQueue();
461-
foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
462-
$this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []);
464+
if ('' !== $this->exchangeOptions['name']) {
465+
foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
466+
$this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []);
467+
}
463468
}
464469
}
465470
$this->autoSetupExchange = false;
@@ -533,11 +538,14 @@ public function exchange(): \AMQPExchange
533538
if (!isset($this->amqpExchange)) {
534539
$this->amqpExchange = $this->amqpFactory->createExchange($this->channel());
535540
$this->amqpExchange->setName($this->exchangeOptions['name']);
536-
$this->amqpExchange->setType($this->exchangeOptions['type'] ?? \AMQP_EX_TYPE_FANOUT);
537-
$this->amqpExchange->setFlags($this->exchangeOptions['flags'] ?? \AMQP_DURABLE);
541+
$defaultExchangeType = '' !== $this->exchangeOptions['name'] ? \AMQP_EX_TYPE_FANOUT : \AMQP_EX_TYPE_DIRECT;
542+
$this->amqpExchange->setType($this->exchangeOptions['type'] ?? $defaultExchangeType);
543+
if ('' !== $this->exchangeOptions['name']) {
544+
$this->amqpExchange->setFlags($this->exchangeOptions['flags'] ?? \AMQP_DURABLE);
538545

539-
if (isset($this->exchangeOptions['arguments'])) {
540-
$this->amqpExchange->setArguments($this->exchangeOptions['arguments']);
546+
if (isset($this->exchangeOptions['arguments'])) {
547+
$this->amqpExchange->setArguments($this->exchangeOptions['arguments']);
548+
}
541549
}
542550
}
543551

0 commit comments

Comments
 (0)
0