8000 [Messenger][Amqp] Fix wrong routing key when use failure queue · symfony/symfony@908a9f3 · GitHub
[go: up one dir, main page]

Skip to content

Commit 908a9f3

Browse files
author
Fabien Perroquin
committed
[Messenger][Amqp] Fix wrong routing key when use failure queue
1 parent 969d39a commit 908a9f3

File tree

4 files changed

+33
-2
lines changed

4 files changed

+33
-2
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,23 @@ private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, stri
804804

805805
return Connection::fromDsn('amqp://localhost', [], $factory);
806806
}
807+
808+
public function testForcePublishWithDefaultRoutingKey()
809+
{
810+
$factory = new TestAmqpFactory(
811+
$amqpConnection = $this->createMock(\AMQPConnection::class),
812+
$amqpChannel = $this->createMock(\AMQPChannel::class),
813+
$amqpQueue = $this->createMock(\AMQPQueue::class),
814+
$amqpExchange = $this->createMock(\AMQPExchange::class)
815+
);
816+
817+
$amqpExchange->expects($this->once())->method('publish')->with('body', 'default_routing_key');
818+
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
819+
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, null, '', true);
820+
821+
$connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
822+
$connection->publish('body', [], 0, $amqpStamp);
823+
}
807824
}
808825

809826
class TestAmqpFactory extends AmqpFactory

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Symfony\Component\Messenger\Exception\TransportException;
1616
use Symfony\Component\Messenger\Stamp\DelayStamp;
1717
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
18+
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
1819
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
1920
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2021
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -59,7 +60,8 @@ public function send(Envelope $envelope): Envelope
5960
$amqpStamp = AmqpStamp::createFromAmqpEnvelope(
6061
$amqpReceivedStamp->getAmqpEnvelope(),
6162
$amqpStamp,
62-
$envelope->last(RedeliveryStamp::class) ? $amqpReceivedStamp->getQueueName() : null
63+
$envelope->last(RedeliveryStamp::class) ? $amqpReceivedStamp->getQueueName() : null,
64+
$envelope->last(SentToFailureTransportStamp::class) ? true : false
6365
);
6466
}
6567

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ final class AmqpStamp implements NonSendableStampInterface
2323
private int $flags;
2424
private array $attributes;
2525
private bool $isRetryAttempt = false;
26+
private bool $forceDefaultRoutingKey = false;
2627

2728
public function __construct(string $routingKey = null, int $flags = \AMQP_NOPARAM, array $attributes = [])
2829
{
@@ -46,7 +47,7 @@ public function getAttributes(): array
4647
return $this->attributes;
4748
}
4849

49-
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null, string $retryRoutingKey = null): self
50+
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null, string $retryRoutingKey = null, bool $forceDefaultRoutingKey = false): self
5051
{
5152
$attr = $previousStamp->attributes ?? [];
5253

@@ -71,6 +72,8 @@ public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self
7172
$stamp->isRetryAttempt = true;
7273
}
7374

75+
$stamp->forceDefaultRoutingKey = $forceDefaultRoutingKey;
76+
7477
return $stamp;
7578
}
7679

@@ -87,4 +90,9 @@ public static function createWithAttributes(array $attributes, self $previousSta
8790
array_merge($previousStamp->attributes ?? [], $attributes)
8891
);
8992
}
93+
94+
public function isForceDefaultRoutingKey(): bool
95+
{
96+
return $this->forceDefaultRoutingKey;
97+
}
9098
}

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,10 @@ public function purgeQueues(): void
563563

564564
private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string
565565
{
566+
if($amqpStamp?->isForceDefaultRoutingKey()) {
567+
return $this->getDefaultPublishRoutingKey();
568+
}
569+
566570
return $amqpStamp?->getRoutingKey() ?? $this->getDefaultPublishRoutingKey();
567571
}
568572
}

0 commit comments

Comments
 (0)
0