diff --git a/UPGRADE-4.2.md b/UPGRADE-4.2.md
index 670fe3c7cfc70..3d2ae32351735 100644
--- a/UPGRADE-4.2.md
+++ b/UPGRADE-4.2.md
@@ -88,7 +88,9 @@ Messenger
---------
* The `handle` method of the `Symfony\Component\Messenger\Middleware\ValidationMiddleware` and `Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware` middlewares now requires an `Envelope` object to be given (because they implement the `EnvelopeAwareInterface`). When using these middleware with the provided `MessageBus`, you will not have to do anything. If you use the middlewares any other way, you can use `Envelope::wrap($message)` to create an envelope for your message.
-
+ * The method `getConnectionCredentials` of the AMQP transport class `Symfony\Component\Messenger\Transport\AmqpExt\Connection`
+ has been renamed to `getConnectionConfiguration`.
+
Security
--------
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
index 0b09978d33870..5e3b5a6db2d89 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
@@ -64,6 +64,7 @@
%kernel.debug%
+
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php
index 5666598337655..0508670503547 100644
--- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php
@@ -69,7 +69,7 @@ public function testItNonAcknowledgeTheMessageIfAnExceptionHappened()
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);
- $connection->expects($this->once())->method('nack')->with($envelope);
+ $connection->expects($this->once())->method('nack')->with($envelope, AMQP_REQUEUE);
$receiver = new AmqpReceiver($serializer, $connection);
$receiver->receive(function () {
@@ -101,6 +101,60 @@ public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionIn
throw new WillNeverWorkException('Well...');
});
}
+
+ public function testItPublishesTheMessageForRetryIfSuchConfiguration()
+ {
+ $serializer = new Serializer(
+ new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
+ );
+
+ $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
+ $envelope->method('getBody')->willReturn('{"message": "Hi"}');
+ $envelope->method('getHeaders')->willReturn(array(
+ 'type' => DummyMessage::class,
+ ));
+
+ $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
+ $connection->method('get')->willReturn($envelope);
+ $connection->method('getConnectionConfiguration')->willReturn(array('retry' => array('attempts' => 3)));
+ $connection->method('publishForRetry')->with($envelope)->willReturn(true);
+
+ $connection->expects($this->once())->method('ack')->with($envelope);
+
+ $receiver = new AmqpReceiver($serializer, $connection);
+ $receiver->receive(function (Envelope $envelope) use ($receiver) {
+ $this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
+ $receiver->stop();
+ });
+ }
+
+ /**
+ * @expectedException \Symfony\Component\Messenger\Tests\Transport\AmqpExt\InterruptException
+ */
+ public function testItThrowsTheExceptionIfTheRetryPublishDidNotWork()
+ {
+ $serializer = new Serializer(
+ new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
+ );
+
+ $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
+ $envelope->method('getBody')->willReturn('{"message": "Hi"}');
+ $envelope->method('getHeaders')->willReturn(array(
+ 'type' => DummyMessage::class,
+ ));
+
+ $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
+ $connection->method('get')->willReturn($envelope);
+ $connection->method('getConnectionConfiguration')->willReturn(array('retry' => array('attempts' => 3)));
+ $connection->method('publishForRetry')->with($envelope)->willReturn(false);
+
+ $connection->expects($this->never())->method('ack')->with($envelope);
+
+ $receiver = new AmqpReceiver($serializer, $connection);
+ $receiver->receive(function () {
+ throw new InterruptException('Well...');
+ });
+ }
}
class InterruptException extends \Exception
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportFactoryTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportFactoryTest.php
index 53a98e2263a07..0e2396f63af1b 100644
--- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportFactoryTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportFactoryTest.php
@@ -41,7 +41,7 @@ public function testItCreatesTheTransport()
true
);
- $expectedTransport = new AmqpTransport($encoder, $decoder, Connection::fromDsn('amqp://localhost', array('foo' => 'bar'), true), array('foo' => 'bar'), true);
+ $expectedTransport = new AmqpTransport($encoder, $decoder, Connection::fromDsn('amqp://localhost', array('foo' => 'bar'), true));
$this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', array('foo' => 'bar')));
}
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php
index 511f8fe3c4414..56b809dd364ea 100644
--- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php
@@ -189,6 +189,98 @@ public function testItCanDisableTheSetup()
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false', array(), true, $factory);
$connection->publish('body');
}
+
+ public function testItRetriesTheMessage()
+ {
+ $amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
+ $amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock();
+ $retryQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock();
+
+ $factory = $this->getMockBuilder(AmqpFactory::class)->getMock();
+ $factory->method('createConnection')->willReturn($amqpConnection);
+ $factory->method('createChannel')->willReturn($amqpChannel);
+ $factory->method('createQueue')->willReturn($retryQueue);
+ $factory->method('createExchange')->will($this->onConsecutiveCalls(
+ $retryExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
+ $amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
+ ));
+
+ $amqpExchange->expects($this->once())->method('setName')->with('messages');
+ $amqpExchange->method('getName')->willReturn('messages');
+
+ $retryExchange->expects($this->once())->method('setName')->with('retry');
+ $retryExchange->expects($this->once())->method('declareExchange');
+ $retryExchange->method('getName')->willReturn('retry');
+
+ $retryQueue->expects($this->once())->method('setName')->with('retry_queue_1');
+ $retryQueue->expects($this->once())->method('setArguments')->with(array(
+ 'x-message-ttl' => 10000,
+ 'x-dead-letter-exchange' => 'messages',
+ ));
+
+ $retryQueue->expects($this->once())->method('declareQueue');
+ $retryQueue->expects($this->once())->method('bind')->with('retry', 'attempt_1');
+
+ $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
+ $envelope->method('getHeader')->with('symfony-messenger-attempts')->willReturn(false);
+ $envelope->method('getHeaders')->willReturn(array('x-some-headers' => 'foo'));
+ $envelope->method('getBody')->willReturn('{}');
+
+ $retryExchange->expects($this->once())->method('publish')->with('{}', 'attempt_1', AMQP_NOPARAM, array('headers' => array('x-some-headers' => 'foo', 'symfony-messenger-attempts' => 1)));
+
+ $connection = Connection::fromDsn('amqp://localhost/%2f/messages', array('retry' => array('attempts' => 3)), false, $factory);
+ $connection->publishForRetry($envelope);
+ }
+
+ public function testItRetriesTheMessageWithADifferentRoutingKeyAndTTLs()
+ {
+ $amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
+ $amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock();
+ $retryQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock();
+
+ $factory = $this->getMockBuilder(AmqpFactory::class)->getMock();
+ $factory->method('createConnection')->willReturn($amqpConnection);
+ $factory->method('createChannel')->willReturn($amqpChannel);
+ $factory->method('createQueue')->willReturn($retryQueue);
+ $factory->method('createExchange')->will($this->onConsecutiveCalls(
+ $retryExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
+ $amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
+ ));
+
+ $amqpExchange->expects($this->once())->method('setName')->with('messages');
+ $amqpExchange->method('getName')->willReturn('messages');
+
+ $retryExchange->expects($this->once())->method('setName')->with('retry');
+ $retryExchange->expects($this->once())->method('declareExchange');
+ $retryExchange->method('getName')->willReturn('retry');
+
+ $connectionOptions = array(
+ 'retry' => array(
+ 'attempts' => 3,
+ 'dead_routing_key' => 'my_dead_routing_key',
+ 'ttl' => array(30000, 60000, 120000),
+ ),
+ );
+
+ $connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, false, $factory);
+
+ $messageRetriedTwice = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
+ $messageRetriedTwice->method('getHeader')->with('symfony-messenger-attempts')->willReturn('2');
+ $messageRetriedTwice->method('getHeaders')->willReturn(array('symfony-messenger-attempts' => '2'));
+ $messageRetriedTwice->method('getBody')->willReturn('{}');
+
+ $retryQueue->expects($this->once())->method('setName')->with('retry_queue_3');
+ $retryQueue->expects($this->once())->method('setArguments')->with(array(
+ 'x-message-ttl' => 120000,
+ 'x-dead-letter-exchange' => 'messages',
+ ));
+
+ $retryQueue->expects($this->once())->method('declareQueue');
+ $retryQueue->expects($this->once())->method('bind')->with('retry', 'attempt_3');
+
+ $retryExchange->expects($this->once())->method('publish')->with('{}', 'attempt_3', AMQP_NOPARAM, array('headers' => array('symfony-messenger-attempts' => 3)));
+ $connection->publishForRetry($messageRetriedTwice);
+ }
}
class TestAmqpFactory extends AmqpFactory
diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
index 0e6fbff8ee340..4c88e6166b845 100644
--- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
+++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
@@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;
+use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
@@ -22,14 +23,18 @@
*/
class AmqpReceiver implements ReceiverInterface
{
+ private const DEFAULT_LOOP_SLEEP_IN_MICRO_SECONDS = 200000;
+
private $decoder;
private $connection;
+ private $logger;
private $shouldStop;
- public function __construct(DecoderInterface $decoder, Connection $connection)
+ public function __construct(DecoderInterface $decoder, Connection $connection, LoggerInterface $logger = null)
{
$this->decoder = $decoder;
$this->connection = $connection;
+ $this->logger = $logger;
}
/**
@@ -39,10 +44,11 @@ public function receive(callable $handler): void
{
while (!$this->shouldStop) {
$AMQPEnvelope = $this->connection->get();
+
if (null === $AMQPEnvelope) {
$handler(null);
- usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000);
+ usleep($this->connection->getConnectionConfiguration()['loop_sleep'] ?? self::DEFAULT_LOOP_SLEEP_IN_MICRO_SECONDS);
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
@@ -62,9 +68,25 @@ public function receive(callable $handler): void
throw $e;
} catch (\Throwable $e) {
- $this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
+ try {
+ $retried = $this->connection->publishForRetry($AMQPEnvelope);
+ } catch (\Throwable $retryException) {
+ $this->logger && $this->logger->warning(sprintf('Retrying message #%s failed. Requeuing it now.', $AMQPEnvelope->getMessageId()), array(
+ 'retryException' => $retryException,
+ 'exception' => $e,
+ ));
- throw $e;
+ $retried = false;
+ }
+
+ if (!$retried) {
+ $this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
+
+ throw $e;
+ }
+
+ // Acknowledge current message as another one as been requeued.
+ $this->connection->ack($AMQPEnvelope);
} finally {
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
@@ -73,6 +95,9 @@ public function receive(callable $handler): void
}
}
+ /**
+ * {@inheritdoc}
+ */
public function stop(): void
{
$this->shouldStop = true;
diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php
index 3edefd0ab1c8a..d97dce3a3a23d 100644
--- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php
+++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php
@@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;
+use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
@@ -26,12 +27,14 @@ class AmqpTransport implements TransportInterface
private $connection;
private $receiver;
private $sender;
+ private $logger;
- public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, Connection $connection)
+ public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, Connection $connection, LoggerInterface $logger = null)
{
$this->encoder = $encoder;
$this->decoder = $decoder;
$this->connection = $connection;
+ $this->logger = $logger;
}
/**
@@ -60,7 +63,7 @@ public function send(Envelope $envelope): void
private function getReceiver()
{
- return $this->receiver = new AmqpReceiver($this->decoder, $this->connection);
+ return $this->receiver = new AmqpReceiver($this->decoder, $this->connection, $this->logger);
}
private function getSender()
diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php
index 29fb4ae4aa3e0..88f547285f97a 100644
--- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php
+++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php
@@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;
+use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
@@ -24,17 +25,19 @@ class AmqpTransportFactory implements TransportFactoryInterface
private $encoder;
private $decoder;
private $debug;
+ private $logger;
- public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, bool $debug)
+ public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, bool $debug, LoggerInterface $logger = null)
{
$this->encoder = $encoder;
$this->decoder = $decoder;
$this->debug = $debug;
+ $this->logger = $logger;
}
public function createTransport(string $dsn, array $options): TransportInterface
{
- return new AmqpTransport($this->encoder, $this->decoder, Connection::fromDsn($dsn, $options, $this->debug));
+ return new AmqpTransport($this->encoder, $this->decoder, Connection::fromDsn($dsn, $options, $this->debug), $this->logger);
}
public function supports(string $dsn, array $options): bool
diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php
index 6f50bd76d5e65..0c1baf40be68b 100644
--- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php
+++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php
@@ -18,7 +18,10 @@
*/
class Connection
{
- private $connectionCredentials;
+ private const DEFAULT_MESSAGE_TTL_IN_MILLI_SECONDS = 10000;
+ private const DEFAULT_MAX_ATTEMPTS = 3;
+
+ private $connectionConfiguration;
private $exchangeConfiguration;
private $queueConfiguration;
private $debug;
@@ -39,9 +42,50 @@ class Connection
*/
private $amqpQueue;
- public function __construct(array $connectionCredentials, array $exchangeConfiguration, array $queueConfiguration, bool $debug = false, AmqpFactory $amqpFactory = null)
+ /**
+ * @var \AMQPExchange|null
+ */
+ private $amqpRetryExchange;
+
+ /**
+ * Available options:
+ *
+ * * host: Hostname of the AMQP service
+ * * port: Port of the AMQP service
+ * * vhost: Virtual Host to use with the AMQP service
+ * * user: Username to use to connect the the AMQP service
+ * * password: Username to use the connect to the AMQP service
+ * * queue:
+ * * name: Name of the queue
+ * * routing_key: The routing key (if any) to use to push the messages to
+ * * flags: Queue flags (Default: AMQP_DURABLE)
+ * * arguments: Extra arguments
+ * * exchange:
+ * * name: Name of the exchange
+ * * type: Type of exchange (Default: fanout)
+ * * flags: Exchange flags (Default: AMQP_DURABLE)
+ * * arguments: Extra arguments
+ * * retry:
+ * * attempts: Number of times it will try to retry
+ * * routing_key_pattern: The pattern of the routing key (Default: "attempt_%attempt%")
+ * * dead_queue: Name of the queue in which messages that retry more than attempts time are pushed to
+ * * dead_routing_key: Routing key name for the dead queue (Default: "dead")
+ * * queue_name_pattern: Pattern to use to create the queues (Default: "retry_queue_%attempt%")
+ * * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry")
+ * * ttl: Key-value pairs of attempt number -> seconds to wait. If not configured, 10 seconds will be waited each attempt.
+ * * auto-setup: Enable or not the auto-setup of queues and exchanges (Default: true)
+ * * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
+ */
+ public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queueConfiguration, bool $debug = false, AmqpFactory $amqpFactory = null)
{
- $this->connectionCredentials = $connectionCredentials;
+ $this->connectionConfiguration = array_replace_recursive(array(
+ 'retry' => array(
+ 'routing_key_pattern' => 'attempt_%attempt%',
+ 'dead_routing_key' => 'dead',
+ 'exchange_name' => 'retry',
+ 'queue_name_pattern' => 'retry_queue_%attempt%',
+ ),
+ ), $connectionConfiguration);
$this->debug = $debug;
$this->exchangeConfiguration = $exchangeConfiguration;
$this->queueConfiguration = $queueConfiguration;
@@ -101,6 +145,108 @@ public function publish(string $body, array $headers = array()): void
$this->exchange()->publish($body, null, AMQP_NOPARAM, array('headers' => $headers));
}
+ /**
+ * @throws \AMQPException
+ */
+ public function publishForRetry(\AMQPEnvelope $message): bool
+ {
+ if (!isset($this->connectionConfiguration['retry'])) {
+ return false;
+ }
+
+ $retryConfiguration = $this->connectionConfiguration['retry'];
+ $attemptNumber = ((int) $message->getHeader('symfony-messenger-attempts') ?: 0) + 1;
+
+ if ($this->shouldSetup()) {
+ $this->setupRetry($retryConfiguration, $attemptNumber);
+ }
+
+ $maximumAttempts = $retryConfiguration['attempts'] ?? self::DEFAULT_MAX_ATTEMPTS;
+ $routingKey = str_replace('%attempt%', $attemptNumber, $retryConfiguration['routing_key_pattern']);
+
+ if ($attemptNumber > $maximumAttempts) {
+ if (!isset($retryConfiguration['dead_queue'])) {
+ return false;
+ }
+
+ $routingKey = $retryConfiguration['dead_routing_key'];
+ }
+
+ $retriedMessageAttributes = array(
+ 'headers' => array_merge($message->getHeaders(), array('symfony-messenger-attempts' => (string) $attemptNumber)),
+ );
+
+ if ($deliveryMode = $message->getDeliveryMode()) {
+ $retriedMessageAttributes['delivery_mode'] = $deliveryMode;
+ }
+ if ($userId = $message->getUserId()) {
+ $retriedMessageAttributes['user_id'] = $userId;
+ }
+ if (null !== $priority = $message->getPriority()) {
+ $retriedMessageAttributes['priority'] = $priority;
+ }
+ if ($replyTo = $message->getReplyTo()) {
+ $retriedMessageAttributes['reply_to'] = $replyTo;
+ }
+
+ $this->retryExchange($retryConfiguration)->publish(
+ $message->getBody(),
+ $routingKey,
+ AMQP_NOPARAM,
+ $retriedMessageAttributes
+ );
+
+ return true;
+ }
+
+ private function setupRetry(array $retryConfiguration, int $attemptNumber)
+ {
+ if (!$this->channel()->isConnected()) {
+ $this->clear();
+ }
+
+ $exchange = $this->retryExchange($retryConfiguration);
+ $exchange->declareExchange();
+
+ $queue = $this->retryQueue($retryConfiguration, $attemptNumber);
+ $queue->declareQueue();
+ $queue->bind($exchange->getName(), str_replace('%attempt%', $attemptNumber, $retryConfiguration['routing_key_pattern']));
+
+ if (isset($retryConfiguration['dead_queue'])) {
+ $queue = $this->amqpFactory->createQueue($this->channel());
+ $queue->setName($retryConfiguration['dead_queue']);
+ $queue->declareQueue();
+ $queue->bind($exchange->getName(), $retryConfiguration['dead_routing_key']);
+ }
+ }
+
+ private function retryExchange(array $retryConfiguration): \AMQPExchange
+ {
+ if (null === $this->amqpRetryExchange) {
+ $this->amqpRetryExchange = $this->amqpFactory->createExchange($this->channel());
+ $this->amqpRetryExchange->setName($retryConfiguration['exchange_name']);
+ $this->amqpRetryExchange->setType(AMQP_EX_TYPE_DIRECT);
+ }
+
+ return $this->amqpRetryExchange;
+ }
+
+ private function retryQueue(array $retryConfiguration, int $attemptNumber)
+ {
+ $queue = $this->amqpFactory->createQueue($this->channel());
+ $queue->setName(str_replace('%attempt%', $attemptNumber, $retryConfiguration['queue_name_pattern']));
+ $queue->setArguments(array(
+ 'x-message-ttl' => $retryConfiguration['ttl'][$attemptNumber - 1] ?? self::DEFAULT_MESSAGE_TTL_IN_MILLI_SECONDS,
+ 'x-dead-letter-exchange' => $this->exchange()->getName(),
+ ));
+
+ if (isset($this->queueConfiguration['routing_key'])) {
+ $queue->setArgument('x-dead-letter-routing-key', $this->queueConfiguration['routing_key']);
+ }
+
+ return $queue;
+ }
+
/**
* Waits and gets a message from the configured queue.
*
@@ -160,8 +306,8 @@ public function setup(): void
public function channel(): \AMQPChannel
{
if (null === $this->amqpChannel) {
- $connection = $this->amqpFactory->createConnection($this->connectionCredentials);
- $connectMethod = 'true' === ($this->connectionCredentials['persistent'] ?? 'false') ? 'pconnect' : 'connect';
+ $connection = $this->amqpFactory->createConnection($this->connectionConfiguration);
+ $connectMethod = 'true' === ($this->connectionConfiguration['persistent'] ?? 'false') ? 'pconnect' : 'connect';
if (false === $connection->{$connectMethod}()) {
throw new \AMQPException('Could not connect to the AMQP server. Please verify the provided DSN.');
@@ -204,9 +350,9 @@ public function exchange(): \AMQPExchange
return $this->amqpExchange;
}
- public function getConnectionCredentials(): array
+ public function getConnectionConfiguration(): array
{
- return $this->connectionCredentials;
+ return $this->connectionConfiguration;
}
private function clear(): void
@@ -218,6 +364,6 @@ private function clear(): void
private function shouldSetup(): bool
{
- return !array_key_exists('auto-setup', $this->connectionCredentials) || !\in_array($this->connectionCredentials['auto-setup'], array(false, 'false'), true);
+ return !array_key_exists('auto-setup', $this->connectionConfiguration) || !\in_array($this->connectionConfiguration['auto-setup'], array(false, 'false'), true);
}
}