8000 bug #32979 [Messenger] return empty envelopes when RetryableException… · symfony/symfony@abf11d8 · GitHub
[go: up one dir, main page]

Skip to content

Commit abf11d8

Browse files
committed
bug #32979 [Messenger] return empty envelopes when RetryableException occurs (surikman)
This PR was squashed before being merged into the 4.3 branch (closes #32979). Discussion ---------- [Messenger] return empty envelopes when RetryableException occurs | Q | A | ------------- | --- | Branch? | 3.4 or 4.3 for bug fixes <!-- see below --> | Bug fix? | yes | New feature? | no <!-- please update src/**/CHANGELOG.md files --> | BC breaks? | no <!-- see https://symfony.com/bc --> | Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files --> | Tests pass? | yes <!-- please add some, will be required by reviewers --> | License | MIT | ~~Doc PR~~ | ~~symfony/symfony-docs#12109~~ <!-- Replace this notice by a short README for your feature/bugfix. This will help people understand your PR and can be used as a start for the documentation. Additionally (see https://symfony.com/roadmap): - Bug fixes must be submitted against the lowest maintained branch where they apply (lowest branches are regularly merged to upper ones so they get the fixes too). - Features and deprecations must be submitted against branch 4.4. - Legacy code removals go to the master branch. --> Problem occurs when you are using more than 1 worker with Doctrine Transport. `Symfony\Component\Messenger\Transport\Doctrine\Connection::get` does a query `SELECT ... FOR UPDATE` and this locking query could lock table and workers stops. But using locks can result in dead locks or lock timeouts. Doctrine renders these SQL errors as RetryableExceptions. These exceptions are often normal if you are in a high concurrency environment. They can happen very often and your application should handle them properly. Commits ------- 9add32a [Messenger] return empty envelopes when RetryableException occurs
2 parents dff71ce + 9add32a commit abf11d8

File tree

3 files changed

+38
-1
lines changed

3 files changed

+38
-1
lines changed

src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@
1111

1212
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
1313

14+
use Doctrine\DBAL\Driver\PDOException;
15+
use Doctrine\DBAL\Exception\DeadlockException;
1416
use PHPUnit\Framework\TestCase;
1517
use Symfony\Component\Messenger\Envelope;
1618
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
19+
use Symfony\Component\Messenger\Exception\TransportException;
1720
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
1821
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
1922
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
@@ -68,6 +71,26 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
6871
$receiver->get();
6972
}
7073

74+
public function testOccursRetryableExceptionFromConnection()
75+
{
76+
$serializer = $this->createSerializer();
77+
$connection = $this->createMock(Connection::class);
78+
$driverException = new PDOException(new \PDOException('Deadlock', 40001));
79+
$connection->method('get')->willThrowException(new DeadlockException('Deadlock', $driverException));
80+
$receiver = new DoctrineReceiver($connection, $serializer);
81+
$this->assertSame([], $receiver->get());
82+
$this->assertSame([], $receiver->get());
83+
try {
84+
$receiver->get();
85+
} catch (TransportException $exception) {
86+
// skip, and retry
87+
}
88+
$this->assertSame([], $receiver->get());
89+
$this->assertSame([], $receiver->get());
90+
$this->expectException(TransportException::class);
91+
$receiver->get();
92+
}
93+
7194
public function testAll()
7295
{
7396
$serializer = $this->createSerializer();

src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Transport\Doctrine;
1313

1414
use Doctrine\DBAL\DBALException;
15+
use Doctrine\DBAL\Exception\RetryableException;
1516
use Symfony\Component\Messenger\Envelope;
1617
use Symfony\Component\Messenger\Exception\LogicException;
1718
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
@@ -30,6 +31,8 @@
3031
*/
3132
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface
3233
{
34+
private const MAX_RETRIES = 3;
35+
private $retryingSafetyCounter = 0;
3336
private $connection;
3437
private $serializer;
3538

@@ -46,6 +49,17 @@ public function get(): iterable
4649
{
4750
try {
4851
$doctrineEnvelope = $this->connection->get();
52+
$this->retryingSafetyCounter = 0; // reset counter
53+
} catch (RetryableException $exception) {
54+
// Do nothing when RetryableException occurs less than "MAX_RETRIES"
55+
// as it will likely be resolved on the next call to get()
56+
// Problem with concurrent consumers and database deadlocks
57+
if (++$this->retryingSafetyCounter >= self::MAX_RETRIES) {
58+
$this->retryingSafetyCounter = 0; // reset counter
59+
throw new TransportException($exception->getMessage(), 0, $exception);
60+
}
61+
62+
return [];
4963
} catch (DBALException $exception) {
5064
throw new TransportException($exception->getMessage(), 0, $exception);
5165
}

src/Symfony/Component/Messenger/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
"psr/log": "~1.0"
2121
},
2222
"require-dev": {
23-
"doctrine/dbal": "^2.5",
23+
"doctrine/dbal": "^2.6",
2424
"psr/cache": "~1.0",
2525
"symfony/console": "~3.4|~4.0",
2626
"symfony/debug": "~4.1",

0 commit comments

Comments
 (0)
0