8000 [Messenger] Implement Safty counter for permanent lock · symfony/symfony@ca322e5 · GitHub
[go: up one dir, main page]

Skip to content

Commit ca322e5

Browse files
committed
[Messenger] Implement Safty counter for permanent lock
1 parent 864b51d commit ca322e5

File tree

3 files changed

+31
-2
lines changed

3 files changed

+31
-2
lines changed

src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use PHPUnit\Framework\TestCase;
1717
use Symfony\Component\Messenger\Envelope;
1818
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
19+
use Symfony\Component\Messenger\Exception\TransportException;
1920
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
2021
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
2122
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
@@ -74,13 +75,32 @@ public function testOccursRetryableExceptionFromConnection()
7475
{
7576
$serializer = $this->createSerializer();
7677
$connection = $this->createMock(Connection::class);
78+
$connection->method('getConfiguration')->willReturn([
79+
'redelivery_max_count' => 3
80+
]);
7781
$driverException = new PDOException(new \PDOException('Deadlock', 40001));
7882
$connection->method('get')->willThrowException(new DeadlockException('Deadlock', $driverException));
7983
$receiver = new DoctrineReceiver($connection, $serializer);
8084
$actualEnvelopes = $receiver->get();
8185
$this->assertSame([], $actualEnvelopes);
8286
}
8387

88+
public function testOccursRetryableExceptionAfterMaxCountFromConnection()
89+
{
90+
$serializer = $this->createSerializer();
91+
$connection = $this->createMock(Connection::class);
92+
$connection->method('getConfiguration')->willReturn([
93+
'redelivery_max_count' => 2
94+
]);
95+
$driverException = new PDOException(new \PDOException('Deadlock', 40001));
96+
$connection->method('get')->willThrowException(new DeadlockException('Deadlock', $driverException));
97+
$receiver = new DoctrineReceiver($connection, $serializer);
98+
$actualEnvelopes = $receiver->get();
99+
$this->assertSame([], $actualEnvelopes);
100+
$this->expectException(TransportException::class);
101+
$receiver->get();
102+
}
103+
84104
public function testAll()
85105
{
86106
$serializer = $this->createSerializer();

src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class Connection
3636
'queue_name' => 'default',
3737
'redeliver_timeout' => 3600,
3838
'auto_setup' => true,
39+
'redelivery_max_count' => 3,
3940
];
4041

4142
/**
@@ -79,6 +80,7 @@ public static function buildConfiguration($dsn, array $options = [])
7980
$configuration = ['connection' => $components['host']];
8081
$configuration += $options + $query + self::DEFAULT_OPTIONS;
8182

83+
$configuration['redelivery_max_count'] = filter_var($configuration['redelivery_max_count'], FILTER_VALIDATE_INT);
8284
$configuration['auto_setup'] = filter_var($configuration['auto_setup'], FILTER_VALIDATE_BOOLEAN);
8385

8486
// check for extra keys in options

src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
*/
3232
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface
3333
{
34+
private $retryingSafetyCounter = 0;
3435
private $connection;
3536
private $serializer;
3637

@@ -47,9 +48,15 @@ public function get(): iterable
4748
{
4849
try {
4950
$doctrineEnvelope = $this->connection->get();
51+
$this->retryingSafetyCounter = 0; // reset counter
5052
} catch (RetryableException $exception) {
51-
// Do nothing when RetryableException occurs.
52-
// Problem with concurent consumers and Database Deadlocks
53+
// Do nothing when RetryableException occurs less than "redelivery_max_count"
54+
// as it will likely be resolved on the next call to get()
55+
// Problem with concurrent consumers and database deadlocks
56+
if (++$this->retryingSafetyCounter >= $this->connection->getConfiguration()['redelivery_max_count']) {
57+
throw new TransportException($exception->getMessage(), 0, $exception);
58+
}
59+
5360
return [];
5461
} catch (DBALException $exception) {
5562
throw new TransportException($exception->getMessage(), 0, $exception);

0 commit comments

Comments
 (0)
0