10000 bug #54105 [Messenger] Improve deadlock handling on `ack()` and `reje… · symfony/symfony@3b9f6b3 · GitHub
[go: up one dir, main page]

Skip to content

Commit 3b9f6b3

Browse files
committed
bug #54105 [Messenger] Improve deadlock handling on ack() and reject() (jwage)
This PR was squashed before being merged into the 6.4 branch. Discussion ---------- [Messenger] Improve deadlock handling on `ack()` and `reject()` | Q | A | ------------- | --- | Branch? | 6.4 | Bug fix? | yes | New feature? | no | Deprecations? | no | Issues | Fix #54103 | License | MIT We started getting this deadlock recently. It has happened only twice so far under high load. ``` SQLSTATE[40P01]: Deadlock detected: 7 ERROR: deadlock detected DETAIL: Process 221664 waits for ShareLock on transaction 59539641; blocked by process 221671. Process 221671 waits for AccessExclusiveLock on tuple (77,27) of relation 16455 of database 16385; blocked by process 221605. Process 221605 waits for ShareLock on transaction 59539646; blocked by process 221606. Process 221606 waits for AccessExclusiveLock on tuple (69,16) of relation 16455 of database 16385; blocked by process 221664. HINT: See server log for query details. CONTEXT: while deleting tuple (69,16) in relation "messenger_messages" Process 221664 waits for ShareLock on transaction 59539641; blocked by process 221671. ``` Here are the queries for each process: ``` Process 221664 waits for ShareLock on transaction 59539641; blocked by process 221671. 221671 SELECT m.* FROM messenger_messages m WHERE (m.queue_name = $1) AND (m.delivered_at is null OR m.delivered_at < $2) AND (m.available_at <= $3) ORDER BY available_at ASC LIMIT 1 FOR UPDATE 221605 SELECT m.* FROM messenger_messages m WHERE (m.queue_name = $1) AND (m.delivered_at is null OR m.delivered_at < $2) AND (m.available_at <= $3) ORDER BY available_at ASC LIMIT 1 FOR UPDATE 221606 SELECT m.* FROM messenger_messages m WHERE (m.queue_name = $1) AND (m.delivered_at is null OR m.delivered_at < $2) AND (m.available_at <= $3) ORDER BY available_at ASC LIMIT 1 FOR UPDATE 221664 DELETE FROM messenger_messages WHERE id = $1 ``` Open for discussion if this is the right way to handle this or not. TODO: - [x] Should there be a retry delay/exponential backoff/jitter? Retrying the failed delete that deadlocked immediately may not help. - [x] Should `skip_locked` even be an option or should we always use skip locked? - [x] Should we add `SKIP LOCKED` to the `FOR UPDATE`? It will reduce contention further. I was looking at how SolidQueue in Ruby On Rails handles this and it appears they use `SKIP LOCKED FOR UPDATE` https://github.com/basecamp/solid_queue/blob/fe57349a126efc381fe0adf4c1ec444bd8a4f53f/app/models/solid_queue/record.rb#L11 Commits ------- 38b67e7 [Messenger] Improve deadlock handling on `ack()` and `reject()`
2 parents 9c4adba + 38b67e7 commit 3b9f6b3

File tree

6 files changed

+331
-13
lines changed

6 files changed

+331
-13
lines changed

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use Doctrine\DBAL\Platforms\OraclePlatform;
2222
use Doctrine\DBAL\Platforms\SQLServer2012Platform;
2323
use Doctrine\DBAL\Platforms\SQLServerPlatform;
24+
use Doctrine\DBAL\Query\ForUpdate\ConflictResolutionMode;
2425
use Doctrine\DBAL\Query\QueryBuilder;
2526
use Doctrine\DBAL\Result;
2627
use Doctrine\DBAL\Schema\AbstractSchemaManager;
@@ -99,6 +100,82 @@ public function testGetWithNoPendingMessageWillReturnNull()
99100
$this->assertNull($doctrineEnvelope);
100101
}
101102

103+
public function testGetWithSkipLockedWithForUpdateMethod()
104+
{
105+
if (!method_exists(QueryBuilder::class, 'forUpdate')) {
106+
$this->markTestSkipped('This test is for when forUpdate method exists.');
107+
}
108+
109+
$queryBuilder = $this->getQueryBuilderMock();
110+
$driverConnection = $this->getDBALConnectionMock();
111+
$stmt = $this->getResultMock(false);
112+
113+
$queryBuilder
114+
->method('getParameters')
115+
->willReturn([]);
116+
$queryBuilder
117+
->method('getParameterTypes')
118+
->willReturn([]);
119+
$queryBuilder
120+
->method('forUpdate')
121+
->with(ConflictResolutionMode::SKIP_LOCKED)
122+
->willReturn($queryBuilder);
123+
$queryBuilder
124+
->method('getSQL')
125+
->willReturn('SELECT FOR UPDATE SKIP LOCKED');
126+
$driverConnection->expects($this->once())
127+
->method('createQueryBuilder')
128+
->willReturn($queryBuilder);
129+
$driverConnection->expects($this->never())
130+
->method('update');
131+
$driverConnection
132+
->method('executeQuery')
133+
->with($this->callback(function ($sql) {
134+
return str_contains($sql, 'SKIP LOCKED');
135+
}))
136+
->willReturn($stmt);
137+
138+
$connection = new Connection(['skip_locked' => true], $driverConnection);
139+
$doctrineEnvelope = $connection->get();
140+
$this->assertNull($doctrineEnvelope);
141+
}
142+
143+
public function testGetWithSkipLockedWithoutForUpdateMethod()
144+
{
145+
if (method_exists(QueryBuilder::class, 'forUpdate')) {
146+
$this->markTestSkipped('This test is for when forUpdate method does not exist.');
147+
}
148+
149+
$queryBuilder = $this->getQueryBuilderMock();
150+
$driverConnection = $this->getDBALConnectionMock();
151+
$stmt = $this->getResultMock(false);
152+
153+
$queryBuilder
154+
->method('getParameters')
155+
->willReturn([]);
156+
$queryBuilder
157+
->method('getParameterTypes')
158+
->willReturn([]);
159+
$queryBuilder
160+
->method('getSQL')
161+
->willReturn('SELECT');
162+
$driverConnection->expects($this->once())
163+
->method('createQueryBuilder')
164+
->willReturn($queryBuilder);
165+
$driverConnection->expects($this->never())
166+
->method('update');
167+
$driverConnection
168+
->method('executeQuery')
169+
->with($this->callback(function ($sql) {
170+
return str_contains($sql, 'SKIP LOCKED');
171+
}))
172+
->willReturn($stmt);
173+
174+
$connection = new Connection(['skip_locked' => true], $driverConnection);
175+
$doctrineEnvelope = $connection->get();
176+
$this->assertNull($doctrineEnvelope);
177+
}
178+
102179
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
103180
{
104181
$this->expectException(TransportException::class);
@@ -507,20 +584,20 @@ class_exists(MySQLPlatform::class) ? new MySQLPlatform() : new MySQL57Platform()
507584

508585
yield 'SQL Server' => [
509586
class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::class) ? new SQLServerPlatform() : new SQLServer2012Platform(),
510-
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
587+
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK, READPAST) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
511588
];
512589

513590
if (!class_exists(MySQL57Platform::class)) {
514591
// DBAL >= 4
515592
yield 'Oracle' => [
516593
new OraclePlatform(),
517-
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC FETCH NEXT 1 ROWS ONLY) FOR UPDATE',
594+
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC FETCH NEXT 1 ROWS ONLY) FOR UPDATE SKIP LOCKED',
518595
];
519596
} else {
520597
// DBAL < 4
521598
yield 'Oracle' => [
522599
new OraclePlatform(),
523-
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE',
600+
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE SKIP LOCKED',
524601
];
525602
}
526603
}

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlIntegrationTest.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,19 @@ public function testPostgreSqlConnectionSendAndGet()
6666
$this->assertNull($this->connection->get());
6767
}
6868

69+
public function testSkipLocked()
70+
{
71+
$connection = new PostgreSqlConnection(['table_name' => 'queue_table', 'skip_locked' => true], $this->driverConnection);
72+
73+
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
74+
75+
$encoded = $connection->get();
76+
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
77+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
78+
79+
$this->assertNull($connection->get());
80+
}
81+
6982
private function createSchemaManager(): AbstractSchemaManager
7083
{
7184
return method_exists($this->driverConnection, 'createSchemaManager')

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlRegularIntegrationTest.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,20 @@ public function testSendAndGetWithAutoSetupEnabledAndSetupAlready()
5757
$this->assertNull($this->connection->get());
5858
}
5959

60+
public function testSendAndGetWithSkipLockedEnabled()
61+
{
62+
$connection = new Connection(['table_name' => 'queue_table', 'skip_locked' => true], $this->driverConnection);
63+
$connection->setup();
64+
65+
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
66+
67+
$encoded = $connection->get();
68+
$this->assertSame('{"message": "Hi"}', $encoded['body']);
69+
$this->assertSame(['type' => DummyMessage::class], $encoded['headers']);
70+
71+
$this->assertNull($this->connection->get());
72+
}
73+
6074
protected function setUp(): void
6175
{
6276
if (!$host = getenv('POSTGRES_HOST')) {

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineReceiverTest.php

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,188 @@ public function testFind()
128128
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage());
129129
}
130130

131+
public function testAck()
132+
{
133+
$serializer = $this->createSerializer();
134+
$connection = $this->createMock(Connection::class);
135+
136+
$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
137+
$receiver = new DoctrineReceiver($connection, $serializer);
138+
139+
$connection
140+
->expects($this->once())
141+
->method('ack')
142+
->with('1')
143+
->willReturn(true);
144+
145+
$receiver->ack($envelope);
146+
}
147+
148+
public function testAckThrowsRetryableException()
149+
{
150+
$serializer = $this->createSerializer();
151+
$connection = $this->createMock(Connection::class);
152+
153+
$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
154+
$receiver = new DoctrineReceiver($connection, $serializer);
155+
156+
$driverException = class_exists(Exception::class) ? Exception::new(new \PDOException('Deadlock', 40001)) : new PDOException(new \PDOException('Deadlock', 40001));
157+
if (!class_exists(Version::class)) {
158+
// This is doctrine/dbal 3.x
159+
$deadlockException = new DeadlockException($driverException, null);
160+
} else {
161+
$deadlockException = new DeadlockException('Deadlock', $driverException);
162+
}
163+
164+
$connection
165+
->expects($this->exactly(2))
166+
->method('ack')
167+
->with('1')
168+
->willReturnOnConsecutiveCalls(
169+
$this->throwException($deadlockException),
170+
true,
171+
);
172+
173+
$receiver->ack($envelope);
174+
}
175+
176+
public function testAckThrowsRetryableExceptionAndRetriesFail()
177+
{
178+
$serializer = $this->createSerializer();
179+
$connection = $this->createMock(Connection::class);
180+
181+
$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
182+
$receiver = new DoctrineReceiver($connection, $serializer);
183+
184+
$driverException = class_exists(Exception::class) ? Exception::new(new \PDOException('Deadlock', 40001)) : new PDOException(new \PDOException('Deadlock', 40001));
185+
if (!class_exists(Version::class)) {
186+
// This is doctrine/dbal 3.x
187+
$deadlockException = new DeadlockException($driverException, null);
188+
} else {
189+
$deadlockException = new DeadlockException('Deadlock', $driverException);
190+
}
191+
192+
$connection
193+
->expects($this->exactly(4))
194+
->method('ack')
195+
->with('1')
196+
->willThrowException($deadlockException);
197+
198+
self::expectException(TransportException::class);
199+
$receiver->ack($envelope);
200+
}
201+
202+
public function testAckThrowsException()
203+
{
204+
$serializer = $this->createSerializer();
205+
$connection = $this->createMock(Connection::class);
206+
207+
$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
208+
$receiver = new DoctrineReceiver($connection, $serializer);
209+
210+
$exception = new \RuntimeException();
211+
212+
$connection
213+
->expects($this->once())
214+
->method('ack')
215+
->with('1')
216+
->willThrowException($exception);
217+
218+
self::expectException($exception::class);
219+
$receiver->ack($envelope);
220+
}
221+
222+
public function testReject()
223+
{
224+
$serializer = $this->createSerializer();
225+
$connection = $this->createMock(Connection::class);
226+
227+
$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
228+
$receiver = new DoctrineReceiver($connection, $serializer);
229+
230+
$connection
231+
->expects($this->once())
232+
->method('reject')
233+
->with('1')
234+
->willReturn(true);
235+
236+
$receiver->reject($envelope);
237+
}
238+
239+
public function testRejectThrowsRetryableException()
240+
{
241+
$serializer = $this->createSerializer();
242+
$connection = $this->createMock(Connection::class);
243+
244+
$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
245+
$receiver = new DoctrineReceiver($connection, $serializer);
246+
247+
$driverException = class_exists(Exception::class) ? Exception::new(new \PDOException('Deadlock', 40001)) : new PDOException(new \PDOException('Deadlock', 40001));
248+
if (!class_exists(Version::class)) {
249+
// This is doctrine/dbal 3.x
250+
$deadlockException = new DeadlockException($driverException, null);
251+
} else {
252+
$deadlockException = new DeadlockException('Deadlock', $driverException);
253+
}
254+
255+
$connection
256+
->expects($this->exactly(2))
257+
->method('reject')
258+
->with('1')
259+
->willReturnOnConsecutiveCalls(
260+
$this->throwException($deadlockException),
261+
true,
262+
);
263+
264+
$receiver->reject($envelope);
265+
}
266+
267+
public function testRejectThrowsRetryableExceptionAndRetriesFail()
268+
{
269+
$serializer = $this->createSerializer();
270+
$connection = $this->createMock(Connection::class);
271+
272+
$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
273+
$receiver = new DoctrineReceiver($connection, $serializer);
274+
275+
$driverException = class_exists(Exception::class) ? Exception::new(new \PDOException('Deadlock', 40001)) : new PDOException(new \PDOException('Deadlock', 40001));
276+
if (!class_exists(Version::class)) {
277+
// This is doctrine/dbal 3.x
278+
$deadlockException = new DeadlockException($driverException, null);
279+
} else {
280+
$deadlockException = new DeadlockException('Deadlock', $driverException);
281+
}
282+
283+
$connection
284+
->expects($this->exactly(4))
285+
->method('reject')
286+
->with('1')
287+
->willThrowException($deadlockException);
288+
289+
self::expectException(TransportException::class);
290+
$receiver->reject($envelope);
291+
}
292+
293+
public function testRejectThrowsException()
294+
{
295+
$serializer = $this->createSerializer();
296+
$connection = $this->createMock(Connection::class);
297+
298+
$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
299+
$receiver = new DoctrineReceiver($connection, $serializer);
300+
301+
$exception = new \RuntimeException();
302+
303+
$connection
304+
->expects($this->once())
305+
->method('reject')
306+
->with('1')
307+
->willThrowException($exception);
308+
309+
self::expectException($exception::class);
310+
$receiver->reject($envelope);
311+
}
312+
131313
private function createDoctrineEnvelope(): array
132314
{
133315
return [

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use Doctrine\DBAL\Platforms\MySQLPlatform;
2222
use Doctrine\DBAL\Platforms\OraclePlatform;
2323
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
24+
use Doctrine\DBAL\Query\ForUpdate\ConflictResolutionMode;
2425
use Doctrine\DBAL\Query\QueryBuilder;
2526
use Doctrine\DBAL\Result;
2627
use Doctrine\DBAL\Schema\AbstractSchemaManager;
@@ -190,15 +191,22 @@ public function get(): ?array
190191
->setParameters($query->getParameters(), $query->getParameterTypes());
191192

192193
if (method_exists(QueryBuilder::class, 'forUpdate')) {
193-
$query->forUpdate();
194+
$query->forUpdate(ConflictResolutionMode::SKIP_LOCKED);
194195
}
195196

196197
$sql = $query->getSQL();
197198
} elseif (method_exists(QueryBuilder::class, 'forUpdate')) {
198-
$query->forUpdate();
199+
$query->forUpdate(ConflictResolutionMode::SKIP_LOCKED);
199200
try {
200201
$sql = $query->getSQL();
201202
} catch (DBALException $e) {
203+
// If SKIP_LOCKED is not supported, fallback to without SKIP_LOCKED
204+
$query->forUpdate();
205+
206+
try {
207+
$sql = $query->getSQL();
208+
} catch (DBALException $e) {
209+
}
202210
}
203211
} elseif (preg_match('/FROM (.+) WHERE/', (string) $sql, $matches)) {
204212
$fromClause = $matches[1];

0 commit comments

Comments
 (0)
0