10000 [Messenger] [Amqp] Handle AMQPConnectionException when publishing a m… · symfony/symfony@f123370 · GitHub
[go: up one dir, main page]

Skip to content

Commit f123370

Browse files
jwagechalasr
authored andcommitted
[Messenger] [Amqp] Handle AMQPConnectionException when publishing a message.
1 parent 182e93e commit f123370

File tree

2 files changed

+106
-13
lines changed

2 files changed

+106
-13
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,73 @@ public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn()
774774
);
775775
}
776776

777+
public function testItCanRetryPublishWhenAMQPConnectionExceptionIsThrown()
778+
{
779+
$factory = new TestAmqpFactory(
780+
$amqpConnection = $this->createMock(\AMQPConnection::class),
781+
$amqpChannel = $this->createMock(\AMQPChannel::class),
782+
$amqpQueue = $this->createMock(\AMQPQueue::class),
783+
$amqpExchange = $this->createMock(\AMQPExchange::class)
784+
);
785+
786+
$amqpExchange->expects($this->exactly(2))
787+
->method('publish')
788+
->willReturnOnConsecutiveCalls(
789+
$this->throwException(new \AMQPConnectionException('a socket error occurred')),
790+
null
791+
);
792+
793+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
794+
$connection->publish('body');
795+
}
796+
797+
public function testItCanRetryPublishWithDelayWhenAMQPConnectionExceptionIsThrown()
798+
{
799+
$factory = new TestAmqpFactory(
800+
$amqpConnection = $this->createMock(\AMQPConnection::class),
801+
$amqpChannel = $this->createMock(\AMQPChannel::class),
802+
$amqpQueue = $this->createMock(\AMQPQueue::class),
803+
$amqpExchange = $this->createMock(\AMQPExchange::class)
804+
);
805+
806+
$amqpExchange->expects($this->exactly(2))
807+
->method('publish')
808+
->willReturnOnConsecutiveCalls(
809+
$this->throwException(new \AMQPConnectionException('a socket error occurred')),
810+
null
811+
);
812+
813+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
814+
$connection->publish('body', [], 5000);
815+
}
816+
817+
public function testItWillRetryMaxThreeTimesWhenAMQPConnectionExceptionIsThrown()
818+
{
819+
$factory = new TestAmqpFactory(
820+
$amqpConnection = $this->createMock(\AMQPConnection::class),
821+
$amqpChannel = $this->createMock(\AMQPChannel::class),
822+
$amqpQueue = $this->createMock(\AMQPQueue::class),
823+
$amqpExchange = $this->createMock(\AMQPExchange::class)
824+
);
825+
826+
$exception = new \AMQPConnectionException('a socket error occurred');
827+
828+
$amqpExchange->expects($this->exactly(4))
829+
->method('publish')
830+
->willReturnOnConsecutiveCalls(
831+
$this->throwException($exception),
832+
$this->throwException($exception),
833+
$this->throwException($exception),
834+
$this->throwException($exception),
835+
);
836+
837+
self::expectException($exception::class);
838+
self::expectExceptionMessage($exception->getMessage());
839+
840+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
841+
$connection->publish('body');
842+
}
843+
777844
private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName): Connection
778845
{
779846
$amqpConnection = $this->createMock(\AMQPConnection::class);

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -287,19 +287,21 @@ public function publish(string $body, array $headers = [], int $delayInMs = 0, ?
287287
$this->setupExchangeAndQueues(); // also setup normal exchange for delayed messages so delay queue can DLX messages to it
288288
}
289289

290-
if (0 !== $delayInMs) {
291-
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
290+
$this->withConnectionExceptionRetry(function () use ($body, $headers, $delayInMs, $amqpStamp) {
291+
if (0 !== $delayInMs) {
292+
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
292293

293-
return;
294-
}
294+
return;
295+
}
295296

296-
$this->publishOnExchange(
297- $this->exchange(),
298-
$body,
299-
$this->getRoutingKeyForMessage($amqpStamp),
300-
$headers,
301-
$amqpStamp
302-
);
297+
$this->publishOnExchange(
298+
$this->exchange(),
299+
$body,
300+
$this->getRoutingKeyForMessage($amqpStamp),
301+
$headers,
302+
$amqpStamp
303+
);
304+
});
303305
}
304306

305307
/**
@@ -545,11 +547,16 @@ public function exchange(): \AMQPExchange
545547
private function clearWhenDisconnected(): void
546548
{
547549
if (!$this->channel()->isConnected()) {
548-
unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange);
549-
$this->amqpQueues = [];
550+
$this->clear();
550551
}
551552
}
552553

554+
private function clear(): void
555+
{
556+
unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange);
557+
$this->amqpQueues = [];
558+
}
559+
553560
private function getDefaultPublishRoutingKey(): ?string
554561
{
555562
return $this->exchangeOptions['default_publish_routing_key'] ?? null;
@@ -566,4 +573,23 @@ private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string
566573
{
567574
return $amqpStamp?->getRoutingKey() ?? $this->getDefaultPublishRoutingKey();
568575
}
576+
577+
private function withConnectionExceptionRetry(callable $callable): void
578+
{
579+
$maxRetries = 3;
580+
$retries = 0;
581+
582+
retry:
583+
try {
584+
$callable();
585+
} catch (\AMQPConnectionException $e) {
586+
if (++$retries <= $maxRetries) {
587+
$this->clear();
588+
589+
goto retry;
590+
}
591+
592+
throw $e;
593+
}
594+
}
569595
}

0 commit comments

Comments
 (0)
0