8000 [Messenger] return empty envelopes when RetryableException occurs · Simperfit/symfony@9add32a · GitHub
[go: up one dir, main page]

Skip to content

Commit 9add32a

Browse files
surikmanTobion
authored andcommitted
[Messenger] return empty envelopes when RetryableException occurs
1 parent ccb3a4c commit 9add32a

File tree

3 files changed

+38
-1
lines changed

3 files changed

+38
-1
lines changed

src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@
1111

1212
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
1313

14+
use Doctrine\DBAL\Driver\PDOException;
15+
use Doctrine\DBAL\Exception\DeadlockException;
1416
use PHPUnit\Framework\TestCase;
1517
use Symfony\Component\Messenger\Envelope;
1618
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
19+
use Symfony\Component\Messenger\Exception\TransportException;
1720
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
1821
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
1922
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
@@ -68,6 +71,26 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
6871
$receiver->get();
6972
}
7073

74+
public function testOccursRetryableExceptionFromConnection()
75+
{
76+
$serializer = $this->createSerializer();
77+
$connection = $this->createMock(Connection::class);
78+
$driverException = new PDOException(new \PDOException('Deadlock', 40001));
79+
$connection->method('get')->willThrowException(new DeadlockException('Deadlock', $driverException));
80+
$receiver = new DoctrineReceiver($connection, $serializer);
81+
$this->assertSame([], $receiver->get());
82+
$this->assertSame([], $receiver->get());
83+
try {
84+
$receiver->get();
85+
} catch (TransportException $exception) {
86+
// skip, and retry
87+
}
88+
$this->assertSame([], $receiver->get());
89+
$this->assertSame([], $receiver->get());
90+
$this->expectException(TransportException::class);
91+
$receiver->get();
92+
}
93+
7194
public function testAll()
7295
{
7396
$serializer = $this->createSerializer();

src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Transport\Doctrine;
1313

1414
use Doctrine\DBAL\DBALException;
15+
use Doctrine\DBAL\Exception\RetryableException;
1516
use Symfony\Component\Messenger\Envelope;
1617
use Symfony\Component\Messenger\Exception\LogicException;
1718
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
@@ -30,6 +31,8 @@
3031
*/
3132
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface
3233
{
34+
private const MAX_RETRIES = 3;
35+
private $retryingSafetyCounter = 0;
3336
private $connection;
3437
private $serializer;
3538

@@ -46,6 +49,17 @@ public function get(): iterable
4649
{
4750
try {
4851
$doctrineEnvelope = $this->connection->get();
52+
$this->retryingSafetyCounter = 0; // reset counter
53+
} catch (RetryableException $exception) {
54+
// Do nothing when RetryableException occurs less than "MAX_RETRIES"
55+
// as it will likely be resolved on 8000 the next call to get()
56+
// Problem with concurrent consumers and database deadlocks
57+
if (++$this->retryingSafetyCounter >= self::MAX_RETRIES) {
58+
$this->retryingSafetyCounter = 0; // reset counter
59+
throw new TransportException($exception->getMessage(), 0, $exception);
60+
}
61+
62+
return [];
4963
} catch (DBALException $exception) {
5064
throw new TransportException($exception->getMessage(), 0, $exception);
5165
}

src/Symfony/Component/Messenger/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
"psr/log": "~1.0"
2121
},
2222
"require-dev": {
23-
"doctrine/dbal": "^2.5",
23+
"doctrine/dbal": "^2.6",
2424
"psr/cache": "~1.0",
2525
"symfony/console": "~3.4|~4.0",
2626
"symfony/debug": "~4.1",

0 commit comments

Comments
 (0)
0