8000 [Messenger][Amqp] Added default exchange support · symfony/symfony@74cebbe · GitHub
[go: up one dir, main page]

Skip to content

Commit 74cebbe

Browse files
committed
[Messenger][Amqp] Added default exchange support
1 parent a0721fd commit 74cebbe

File tree

4 files changed

+94
-7
lines changed

4 files changed

+94
-7
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Add option `delay[arguments]` in the transport definition
8+
* Add default exchange support
89

910
6.0
1011
---

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,36 @@ public function testItSendsAndReceivesMessages()
7777
$this->assertEmpty(iterator_to_array($receiver->get()));
7878
}
7979

80+
public function testItSendsAndReceivesMessagesThroughDefaultExchange()
81+
{
82+
$serializer = $this->createSerializer();
83+
84+
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), ['exchange' => ['name' => '']]);
85+
$connection->setup();
86+
$connection->purgeQueues();
87+
88+
$sender = new AmqpSender($connection, $serializer);
89+
$receiver = new AmqpReceiver($connection, $serializer);
90+
91+
$sender->send($first = new Envelope(new DummyMessage('First'), [new AmqpStamp('messages')]));
92+
$sender->send($second = new Envelope(new DummyMessage('Second'), [new AmqpStamp('messages')]));
93+
94+
$envelopes = iterator_to_array($receiver->get());
95+
$this->assertCount(1, $envelopes);
96+
/** @var Envelope $envelope */
97+
$envelope = $envelopes[0];
98+
$this->assertEquals($first->getMessage(), $envelope->getMessage());
99+
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
100+
101+
$envelopes = iterator_to_array($receiver->get());
102+
$this->assertCount(1, $envelopes);
103+
/** @var Envelope $envelope */
104+
$envelope = $envelopes[0];
105+
$this->assertEquals($second->getMessage(), $envelope->getMessage());
106+
107+
$this->assertEmpty(iterator_to_array($receiver->get()));
108+
}
109+
80110
public function testRetryAndDelay()
81111
{
82112
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -877,6 +877,55 @@ private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, stri
877877

878878
return Connection::fromDsn('amqp://localhost', [], $factory);
879879
}
880+
881+
public function testGettingDefaultExchange()
882+
{
883+
$factory = $this->createMock(AmqpFactory::class);
884+
885+
$amqpExchange = $this->createMock(\AMQPExchange::class);
886+
$amqpExchange->expects($this->once())->method('setName')->with('');
887+
$amqpExchange->expects($this->never())->method('setType');
888+
$amqpExchange->expects($this->never())->method('setFlags');
889+
$amqpExchange->expects($this->never())->method('setArguments');
890+
891+
$factory->expects($this->once())->method('createExchange')->willReturn($amqpExchange);
892+
893+
$connection = new Connection([
894+
'host' => 'localhost',
895+
'port' => 5672,
896+
'vhost' => '/',
897+
], [
898+
'name' => '',
899+
], [
900+
'' => [],
901+
], $factory);
902+
903+
$connection->exchange();
904+
}
905+
906+
public function testBindIsNotCalledWhenPublishingInDefaultExchange()
907+
{
908+
$factory = $this->createMock(AmqpFactory::class);
909+
910+
$amqpExchange = $this->createMock(\AMQPExchange::class);
911+
$amqpExchange->expects($this->never())->method('declareExchange');
912+
913+
$factory->expects($this->once())->method('createExchange')->willReturn($amqpExchange);
914+
$factory->expects($this->once())->method('createQueue')->willReturn($queue = $this->createMock(\AMQPQueue::class));
915+
$queue->expects($this->never())->method('bind');
916+
917+
$connection = new Connection([
918+
'host' => 'localhost',
919+
'port' => 5672,
920+
'vhost' => '/',
921+
], [
922+
'name' => '',
923+
], [
924+
'' => [],
925+
], $factory);
926+
927+
$connection->publish('body');
928+
}
880929
}
881930

882931
class TestAmqpFactory extends AmqpFactory

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -455,12 +455,17 @@ public function setup(): void
455455

456456
private function setupExchangeAndQueues(): void
457457
{
458-
$this->exchange()->declareExchange();
458+
$exchange = $this->exchange();
459+
if ($this->exchangeOptions['name'] !== '') {
460+
$exchange->declareExchange();
461+
}
459462

460463
foreach ($this->queuesOptions as $queueName => $queueConfig) {
461464
$this->queue($queueName)->declareQueue();
462-
foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
463-
$this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []);
465+
if ($this->exchangeOptions['name'] !== '') {
466+
foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
467+
$this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []);
468+
}
464469
}
465470
}
466471
$this->autoSetupExchange = false;
@@ -534,11 +539,13 @@ public function exchange(): \AMQPExchange
534539
if (!isset($this->amqpExchange)) {
535540
$this->amqpExchange = $this->amqpFactory->createExchange($this->channel());
536541
$this->amqpExchange->setName($this->exchangeOptions['name']);
537-
$this->amqpExchange->setType($this->exchangeOptions['type'] ?? \AMQP_EX_TYPE_FANOUT);
538-
$this->amqpExchange->setFlags($this->exchangeOptions['flags'] ?? \AMQP_DURABLE);
542+
if ($this->exchangeOptions['name'] !== '') {
543+
$this->amqpExchange->setType($this->exchangeOptions['type'] ?? \AMQP_EX_TYPE_FANOUT);
544+
$this->amqpExchange->setFlags($this->exchangeOptions['flags'] ?? \AMQP_DURABLE);
539545

540-
if (isset($this->exchangeOptions['arguments'])) {
541-
$this->amqpExchange->setArguments($this->exchangeOptions['arguments']);
546+
if (isset($this->exchangeOptions['arguments'])) {
547+
$this->amqpExchange->setArguments($this->exchangeOptions['arguments']);
548+
}
542549
}
543550
}
544551

0 commit comments

Comments
 (0)
0