8000 bug #54167 [Messenger] [Amqp] Handle AMQPConnectionException when pub… · fancyweb/symfony@e43b198 · GitHub
[go: up one dir, main page]

Skip to content

Commit e43b198

Browse files
committed
bug symfony#54167 [Messenger] [Amqp] Handle AMQPConnectionException when publishing a message. (jwage)
This PR was squashed before being merged into the 6.4 branch. Discussion ---------- [Messenger] [Amqp] Handle AMQPConnectionException when publishing a message. | Q | A | ------------- | --- | Branch? | 6.4 | Bug fix? | yes | New feature? | no | Deprecations? | no | Issues | Fix symfony#36538 Fix symfony#48241 | License | MIT If you have a message handler that dispatches messages to another queue, you can encounter `AMQPConnectionException` with the message "Library error: a SSL error occurred" or "a socket error occurred" depending on if you are using tls or not or if you are running behind a load balancer or not. You can manually reproduce this issue by dispatching a message where the handler then dispatches another message to a different queue, then go to rabbitmq admin and close the connection manually, then dispatch another message and when the message handler goes to dispatch the other message, you will get this exception: ``` a socket error occurred #0 /vagrant/vendor/symfony/amqp-messenger/Transport/AmqpTransport.php(60): Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender->send() #1 /vagrant/vendor/symfony/messenger/Middleware/SendMessageMiddleware.php(62): Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransport->send() #2 /vagrant/vendor/symfony/messenger/Middleware/FailedMessageProcessingMiddleware.php(34): Symfony\Component\Messenger\Middleware\SendMessageMiddleware->handle() #3 /vagrant/vendor/symfony/messenger/Middleware/DispatchAfterCurrentBusMiddleware.php(61): Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware->handle() #4 /vagrant/vendor/symfony/messenger/Middleware/RejectRedeliveredMessageMiddleware.php(41): Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware->handle() symfony#5 /vagrant/vendor/symfony/messenger/Middleware/AddBusNameStampMiddleware.php(37): Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware->handle() symfony#6 /vagrant/vendor/symfony/messenger/Middleware/TraceableMiddleware.php(40): Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware->handle() symfony#7 /vagrant/vendor/symfony/messenger/MessageBus.php(70): Symfony\Component\Messenger\Middleware\TraceableMiddleware->handle() symfony#8 /vagrant/vendor/symfony/messenger/TraceableMessageBus.php(38): Symfony\Component\Messenger\MessageBus->dispatch() symfony#9 /vagrant/src/Messenger/MessageBus.php(37): Symfony\Component\Messenger\TraceableMessageBus->dispatch() symfony#10 /vagrant/vendor/symfony/mailer/Mailer.php(66): App\Messenger\MessageBus->dispatch() symfony#11 /vagrant/src/Mailer/Mailer.php(83): Symfony\Component\Mailer\Mailer->send() symfony#12 /vagrant/src/Mailer/Mailer.php(96): App\Mailer\Mailer->send() symfony#13 /vagrant/src/MessageHandler/Trading/StrategySubscriptionMessageHandler.php(118): App\Mailer\Mailer->sendEmail() symfony#14 /vagrant/src/MessageHandler/Trading/StrategySubscriptionMessageHandler.php(72): App\MessageHandler\Trading\StrategySubscriptionMessageHandler->handle() symfony#15 /vagrant/vendor/symfony/messenger/Middleware/HandleMessageMiddleware.php(152): App\MessageHandler\Trading\StrategySubscriptionMessageHandler->__invoke() symfony#16 /vagrant/vendor/symfony/messenger/Middleware/HandleMessageMiddleware.php(91): Symfony\Component\Messenger\Middleware\HandleMessageMiddleware->callHandler() symfony#17 /vagrant/vendor/symfony/messenger/Middleware/SendMessageMiddleware.php(71): Symfony\Component\Messenger\Middleware\HandleMessageMiddleware->handle() symfony#18 /vagrant/vendor/symfony/messenger/Middleware/FailedMessageProcessingMiddleware.php(34): Symfony\Component\Messenger\Middleware\SendMessageMiddleware->handle() symfony#19 /vagrant/vendor/symfony/messenger/Middleware/DispatchAfterCurrentBusMiddleware.php(68): Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware->handle() symfony#20 /vagrant/vendor/symfony/messenger/Middleware/RejectRedeliveredMessageMiddleware.php(41): Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware->handle() symfony#21 /vagrant/vendor/symfony/messenger/Middleware/AddBusNameStampMiddleware.php(37): Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware->handle() symfony#22 /vagrant/vendor/symfony/messenger/Middleware/TraceableMiddleware.php(40): Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware->handle() symfony#23 /vagrant/vendor/symfony/messenger/MessageBus.php(70): Symfony\Component\Messenger\Middleware\TraceableMiddleware->handle() symfony#24 /vagrant/vendor/symfony/messenger/TraceableMessageBus.php(38): Symfony\Component\Messenger\MessageBus->dispatch() symfony#25 /vagrant/vendor/symfony/messenger/RoutableMessageBus.php(54): Symfony\Component\Messenger\TraceableMessageBus->dispatch() symfony#26 /vagrant/vendor/symfony/messenger/Worker.php(162): Symfony\Component\Messenger\RoutableMessageBus->dispatch() symfony#27 /vagrant/vendor/symfony/messenger/Worker.php(109): Symfony\Component\Messenger\Worker->handleMessage() symfony#28 /vagrant/vendor/symfony/messenger/Command/ConsumeMessagesCommand.php(238): Symfony\Component\Messenger\Worker->run() symfony#29 /vagrant/vendor/symfony/console/Command/Command.php(326): Symfony\Component\Messenger\Command\ConsumeMessagesCommand->execute() symfony#30 /vagrant/vendor/symfony/console/Application.php(1096): Symfony\Component\Console\Command\Command->run() symfony#31 /vagrant/vendor/symfony/framework-bundle/Console/Application.php(126): Symfony\Component\Console\Application->doRunCommand() symfony#32 /vagrant/vendor/symfony/console/Application.php(324): Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand() symfony#33 /vagrant/vendor/symfony/framework-bundle/Console/Application.php(80): Symfony\Component\Console\Application->doRun() symfony#34 /vagrant/vendor/symfony/console/Application.php(175): Symfony\Bundle\FrameworkBundle\Console\Application->doRun() symfony#35 /vagrant/vendor/symfony/runtime/Runner/Symfony/ConsoleApplicationRunner.php(49): Symfony\Component\Console\Application->run() symfony#36 /vagrant/vendor/autoload_runtime.php(29): Symfony\Component\Runtime\Runner\Symfony\ConsoleApplicationRunner->run() symfony#37 /vagrant/bin/console(11): require_once('...') symfony#38 {main} ``` TODO: - [x] Add test for retry logic when publishing messages Commits ------- f123370 [Messenger] [Amqp] Handle AMQPConnectionException when publishing a message.
2 parents 0523300 + f123370 commit e43b198

File tree

2 files changed

+106
-13
lines changed

2 files changed

+106
-13
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,73 @@ public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn()
774774
);
775775
}
776776

777+
public function testItCanRetryPublishWhenAMQPConnectionExceptionIsThrown()
778+
{
779+
$factory = new TestAmqpFactory(
780+
$amqpConnection = $this->createMock(\AMQPConnection::class),
781+
$amqpChannel = $this->createMock(\AMQPChannel::class),
782+
$amqpQueue = $this->createMock(\AMQPQueue::class),
783+
$amqpExchange = $this->createMock(\AMQPExchange::class)
784+
);
785+
786+
$amqpExchange->expects($this->exactly(2))
787+
->method('publish')
788+
->willReturnOnConsecutiveCalls(
789+
$this->throwException(new \AMQPConnectionException('a socket error occurred')),
790+
null
791+
);
792+
793+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
794+
$connection->publish('body');
795+
}
796+
797+
public function testItCanRetryPublishWithDelayWhenAMQPConnectionExceptionIsThrown()
798+
{
799+
$factory = new TestAmqpFactory(
800+
$amqpConnection = $this->createMock(\AMQPConnection::class),
801+
$amqpChannel = $this->createMock(\AMQPChannel::class),
802+
$amqpQueue = $this->createMock(\AMQPQueue::class),
803+
$amqpExchange = $this->createMock(\AMQPExchange::class)
804+
);
805+
806+
$amqpExchange->expects($this->exactly(2))
807+
->method('publish')
808+
->willReturnOnConsecutiveCalls(
809+
$this->throwException(new \AMQPConnectionException('a socket error occurred')),
810+
null
811+
);
812+
813+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
814+
$connection->publish('body', [], 5000);
815+
}
816+
817+
public function testItWillRetryMaxThreeTimesWhenAMQPConnectionExceptionIsThrown()
818+
{
819+
$factory = new TestAmqpFactory(
820+
$amqpConnection = $this->createMock(\AMQPConnection::class),
821+
$amqpChannel = $this->createMock(\AMQPChannel::class),
822+
$amqpQueue = $this->createMock(\AMQPQueue::class),
823+
$amqpExchange = $this->createMock(\AMQPExchange::class)
824+
);
825+
826+
$exception = new \AMQPConnectionException('a socket error occurred');
827+
828+
$amqpExchange->expects($this->exactly(4))
829+
->method('publish')
830+
->willReturnOnConsecutiveCalls(
831+
$this->throwException($exception),
832+
$this->throwException($exception),
833+
$this->throwException($exception),
834+
$this->throwException($exception),
835+
);
836+
837+
self::expectException($exception::class);
838+
self::expectExceptionMessage($exception->getMessage());
839+
840+
6D47 $connection = Connection::fromDsn('amqp://localhost', [], $factory);
841+
$connection->publish('body');
842+
}
843+
777844
private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName): Connection
778845
{
779846
$amqpConnection = $this->createMock(\AMQPConnection::class);

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -287,19 +287,21 @@ public function publish(string $body, array $headers = [], int $delayInMs = 0, ?
287287
$this->setupExchangeAndQueues(); // also setup normal exchange for delayed messages so delay queue can DLX messages to it
288288
}
289289

290-
if (0 !== $delayInMs) {
291-
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
290+
$this->withConnectionExceptionRetry(function () use ($body, $headers, $delayInMs, $amqpStamp) {
291+
if (0 !== $delayInMs) {
292+
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
292293

293-
return;
294-
}
294+
return;
295+
}
295296

296-
$this->publishOnExchange(
297-
$this->exchange(),
298-
$body,
299-
$this->getRoutingKeyForMessage($amqpStamp),
300-
$headers,
301-
$amqpStamp
302-
);
297+
$this->publishOnExchange(
298+
$this->exchange(),
299+
$body,
300+
$this->getRoutingKeyForMessage($amqpStamp),
301+
$headers,
302+
$amqpStamp
303+
);
304+
});
303305
}
304306

305307
/**
@@ -545,11 +547,16 @@ public function exchange(): \AMQPExchange
545547
private function clearWhenDisconnected(): void
546548
{
547549
if (!$this->channel()->isConnected()) {
548-
unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange);
549-
$this->amqpQueues = [];
550+
$this->clear();
550551
}
551552
}
552553

554+
private function clear(): void
555+
{
556+
unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange);
557+
$this->amqpQueues = [];
558+
}
559+
553560
private function getDefaultPublishRoutingKey(): ?string
554561
{
555562
return $this->exchangeOptions['default_publish_routing_key'] ?? null;
@@ -566,4 +573,23 @@ private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string
566573
{
567574
return $amqpStamp?->getRoutingKey() ?? $this->getDefaultPublishRoutingKey();
568575
}
576+
577+
private function withConnectionExceptionRetry(callable $callable): void
578+
{
579+
$maxRetries = 3;
580+
$retries = 0;
581+
582+
retry:
583+
try {
584+
$callable();
585+
} catch (\AMQPConnectionException $e) {
586+
if (++$retries <= $maxRetries) {
587+
$this->clear();
588+
589+
goto retry;
590+
}
591+
592+
throw $e;
593+
}
594+
}
569595
}

0 commit comments

Comments
 (0)
0