diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index 5e9652dcede35..3428eb017f1b1 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -804,6 +804,23 @@ private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, stri return Connection::fromDsn('amqp://localhost', [], $factory); } + + public function testForcePublishWithDefaultRoutingKey() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $amqpQueue = $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->expects($this->once())->method('publish')->with('body', 'default_routing_key'); + $amqpEnvelope = $this->createMock(\AMQPEnvelope::class); + $amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, null, '', true); + + $connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=default_routing_key', [], $factory); + $connection->publish('body', [], 0, $amqpStamp); + } } class TestAmqpFactory extends AmqpFactory diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php index cbcecffa7ef57..60f80166edaae 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php @@ -15,6 +15,7 @@ use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; +use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -59,7 +60,8 @@ public function send(Envelope $envelope): Envelope $amqpStamp = AmqpStamp::createFromAmqpEnvelope( $amqpReceivedStamp->getAmqpEnvelope(), $amqpStamp, - $envelope->last(RedeliveryStamp::class) ? $amqpReceivedStamp->getQueueName() : null + $envelope->last(RedeliveryStamp::class) ? $amqpReceivedStamp->getQueueName() : null, + $envelope->last(SentToFailureTransportStamp::class) ? true : false ); } diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php index 127593d38b66d..1342ca1ed2dbb 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php @@ -23,6 +23,7 @@ final class AmqpStamp implements NonSendableStampInterface private int $flags; private array $attributes; private bool $isRetryAttempt = false; + private bool $forceDefaultRoutingKey = false; public function __construct(string $routingKey = null, int $flags = \AMQP_NOPARAM, array $attributes = []) { @@ -46,7 +47,7 @@ public function getAttributes(): array return $this->attributes; } - public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null, string $retryRoutingKey = null): self + public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null, string $retryRoutingKey = null, bool $forceDefaultRoutingKey = false): self { $attr = $previousStamp->attributes ?? []; @@ -71,6 +72,8 @@ public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $stamp->isRetryAttempt = true; } + $stamp->forceDefaultRoutingKey = $forceDefaultRoutingKey; + return $stamp; } @@ -87,4 +90,9 @@ public static function createWithAttributes(array $attributes, self $previousSta array_merge($previousStamp->attributes ?? [], $attributes) ); } + + public function isForceDefaultRoutingKey(): bool + { + return $this->forceDefaultRoutingKey; + } } diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 1ed87b1a3b510..69b93911098f0 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -563,6 +563,10 @@ public function purgeQueues(): void private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string { + if ($amqpStamp?->isForceDefaultRoutingKey()) { + return $this->getDefaultPublishRoutingKey(); + } + return $amqpStamp?->getRoutingKey() ?? $this->getDefaultPublishRoutingKey(); } }