8000 minor #52639 [Messenger] Add `SKIP LOCKED` to the query that retrieve… · symfony/symfony@06a85e0 · GitHub
[go: up one dir, main page]

Skip to content

Commit 06a85e0

Browse files
minor #52639 [Messenger] Add SKIP LOCKED to the query that retrieves messages (hgraca)
This PR was squashed before being merged into the 7.1 branch. Discussion ---------- [Messenger] Add `SKIP LOCKED` to the query that retrieves messages | Q | A | ------------- | --- | Branch? | 7.1 | Bug fix? | no | New feature? | yes | Deprecations? | no | Issues | - | License | MIT The current `SELECT ... FOR UPDATE` retrieves rows, using the index to find and lock the retrieved rows. This means that when we have more than one consumer, while one consumer is locking those rows, the whole index is locked thus other queries (consumers) will be put on hold. While with a small table this might not be noticeable, as the table grows the meddling with the index becomes slower and the other consumers have to wait more time, eventually making the MQ inoperable. The `SKIP LOCKED` addition will allow other consumers to query the table and get messages immediately, ignoring the rows that other consumers are locking. Commits ------- b4fe683 [Messenger] Add `SKIP LOCKED` to the query that retrieves messages
2 parents 79f841c + b4fe683 commit 06a85e0

File tree

3 files changed

+123
-30
lines changed

3 files changed

+123
-30
lines changed

src/Symfony/Component/Messenger/Bridge/Doctrine/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
7.1
5+
---
6+
7+
* Use `SKIP LOCKED` in the doctrine transport for MySQL, PostgreSQL and MSSQL
8+
49
5.1.0
510
-----
611

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php

Lines changed: 82 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@
1414
use Doctrine\DBAL\Connection as DBALConnection;
1515
use Doctrine\DBAL\Exception as DBALException;
1616
use Doctrine\DBAL\Platforms\AbstractPlatform;
17+
use Doctrine\DBAL\Platforms\MariaDb1060Platform;
1718
use Doctrine\DBAL\Platforms\MariaDBPlatform;
1819
use Doctrine\DBAL\Platforms\MySQL57Platform;
20+
use Doctrine\DBAL\Platforms\MySQL80Platform;
1921
use Doctrine\DBAL\Platforms\MySQLPlatform;
2022
use Doctrine\DBAL\Platforms\OraclePlatform;
23+
use Doctrine\DBAL\Platforms\PostgreSQL100Platform;
24+
use Doctrine\DBAL\Platforms\PostgreSQL94Platform;
25+
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
2126
use Doctrine\DBAL\Platforms\SQLServer2012Platform;
2227
use Doctrine\DBAL\Platforms\SQLServerPlatform;
2328
use Doctrine\DBAL\Query\QueryBuilder;
@@ -391,28 +396,94 @@ class_exists(MySQLPlatform::class) ? new MySQLPlatform() : new MySQL57Platform()
391396
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE',
392397
];
393398

399+
if (class_exists(MySQL80Platform::class) && !method_exists(QueryBuilder::class, 'forUpdate')) {
400+
yield 'MySQL8 & DBAL<3.8' => [
401+
new MySQL80Platform(),
402+
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE',
403+
];
404+
}
405+
406+
if (class_exists(MySQL80Platform::class) && method_exists(QueryBuilder::class, 'forUpdate')) {
407+
yield 'MySQL8 & DBAL>=3.8' => [
408+
new MySQL80Platform(),
409+
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED',
410+
];
411+
}
412+
394413
yield 'MariaDB' => [
395414
new MariaDBPlatform(),
396415
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE',
397416
];
398417

399-
yield 'SQL Server' => [
400-
class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::class) ? new SQLServerPlatform() : new SQLServer2012Platform(),
401-
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
402-
];
418+
if (class_exists(MariaDb1060Platform::class)) {
419+
yield 'MariaDB106' => [
420+
new MariaDb1060Platform(),
421+
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED',
422+
];
423+
}
403424

404-
if (!class_exists(MySQL57Platform::class)) {
405-
// DBAL >= 4
406-
yield 'Oracle' => [
407-
new OraclePlatform(),
408-
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC FETCH NEXT 1 ROWS ONLY) FOR UPDATE',
425+
if (class_exists(MySQL57Platform::class)) {
426+
yield 'Postgres & DBAL<4' => [
427+
new PostgreSQLPlatform(),
428+
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE',
409429
];
410430
} else {
411-
// DBAL < 4
412-
yield 'Oracle' => [
431+
yield 'Postgres & DBAL>=4' => [
432+
new PostgreSQLPlatform(),
433+
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED',
434+
];
435+
}
436+
437+
if (class_exists(PostgreSQL94Platform::class)) {
438+
yield 'Postgres94' => [
439+
new PostgreSQL94Platform(),
440+
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE',
441+
];
442+
}
443+
444+
if (class_exists(PostgreSQL100Platform::class) && !method_exists(QueryBuilder::class, 'forUpdate')) {
445+
yield 'Postgres10 & DBAL<3.8' => [
446+
new PostgreSQL100Platform(),
447+
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE',
448+
];
449+
}
450+
451+
if (class_exists(PostgreSQL100Platform::class) && method_exists(QueryBuilder::class, 'forUpdate')) {
452+
yield 'Postgres10 & DBAL>=3.8' => [
453+
new PostgreSQL100Platform(),
454+
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED',
455+
];
456+
}
457+
458+
if (!method_exists(QueryBuilder::class, 'forUpdate')) {
459+
yield 'SQL Server & DBAL<3.8' => [
460+
class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::class) ? new SQLServerPlatform() : new SQLServer2012Platform(),
461+
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
462+
];
463+
}
464+
465+
if (method_exists(QueryBuilder::class, 'forUpdate')) {
466+
yield 'SQL Server & DBAL>=3.8' => [
467+
class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::class) ? new SQLServerPlatform() : new SQLServer2012Platform(),
468+
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK, READPAST) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
469+
];
470+
}
471+
472+
if (!method_exists(QueryBuilder::class, 'forUpdate')) {
473+
yield 'Oracle & DBAL<3.8' => [
413474
new OraclePlatform(),
414475
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE',
415476
];
477+
} elseif (class_exists(MySQL57Platform::class)) {
478+
yield 'Oracle & 3.8<=DBAL<4' => [
479+
new OraclePlatform(),
480+
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE SKIP LOCKED',
481+
];
482+
} else {
483+
yield 'Oracle & DBAL>=4' => [
484+
new OraclePlatform(),
485+
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC FETCH NEXT 1 ROWS ONLY) FOR UPDATE SKIP LOCKED',
486+
];
416487
}
417488
}
418489

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Doctrine\DBAL\LockMode;
1919
use Doctrine\DBAL\Platforms\AbstractMySQLPlatform;
2020
use Doctrine\DBAL\Platforms\OraclePlatform;
21+
use Doctrine\DBAL\Query\ForUpdate\ConflictResolutionMode;
2122
use Doctrine\DBAL\Query\QueryBuilder;
2223
use Doctrine\DBAL\Result;
2324
use Doctrine\DBAL\Schema\Schema;
@@ -32,6 +33,8 @@
3233
*
3334
* @author Vincent Touzet <vincent.touzet@gmail.com>
3435
* @author Kévin Dunglas <dunglas@gmail.com>
36+
* @author Herberto Graca <herberto.graca@gmail.com>
37+
* @author Alexander Malyk <shu.rick.ifmo@gmail.com>
3538
*/
3639
class Connection implements ResetInterface
3740
{
@@ -178,28 +181,22 @@ public function get(): ?array
178181
->where('w.id IN ('.str_replace('SELECT a.* FROM', 'SELECT a.id FROM', $sql).')')
179182
->setParameters($query->getParameters());
180183

181-
if (method_exists(QueryBuilder::class, 'forUpdate')) {
182-
$query->forUpdate();
183-
}
184-
185184
$sql = $query->getSQL();
186-
} elseif (method_exists(QueryBuilder::class, 'forUpdate')) {
187-
$query->forUpdate();
188-
try {
189-
$sql = $query->getSQL();
190-
} catch (DBALException $e) {
191-
}
192-
} elseif (preg_match('/FROM (.+) WHERE/', (string) $sql, $matches)) {
193-
$fromClause = $matches[1];
194-
$sql = str_replace(
195-
sprintf('FROM %s WHERE', $fromClause),
196-
sprintf('FROM %s WHERE', $this->driverConnection->getDatabasePlatform()->appendLockHint($fromClause, LockMode::PESSIMISTIC_WRITE)),
197-
$sql
198-
);
199185
}
200186

201-
// use SELECT ... FOR UPDATE to lock table
202-
if (!method_exists(QueryBuilder::class, 'forUpdate')) {
187+
if (method_exists(QueryBuilder::class, 'forUpdate')) {
188+
$sql = $this->addLockMode($query, $sql);
189+
} else {
190+
if (preg_match('/FROM (.+) WHERE/', (string) $sql, $matches)) {
191+
$fromClause = $matches[1];
192+
$sql = str_replace(
193+
sprintf('FROM %s WHERE', $fromClause),
194+
sprintf('FROM %s WHERE', $this->driverConnection->getDatabasePlatform()->appendLockHint($fromClause, LockMode::PESSIMISTIC_WRITE)),
195+
$sql
196+
);
197+
}
198+
199+
// use SELECT ... FOR UPDATE to lock table
203200
$sql .= ' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL();
204201
}
205202

@@ -493,4 +490,24 @@ private function updateSchema(): void
493490
}
494491
}
495492
}
493+
494+
private function addLockMode(QueryBuilder $query, string $sql): string
495+
{
496+
$query->forUpdate(ConflictResolutionMode::SKIP_LOCKED);
497+
try {
498+
return $query->getSQL();
499+
} catch (DBALException) {
500+
return $this->fallBackToForUpdate($query, $sql);
501+
}
502+
}
503+
504+
private function fallBackToForUpdate(QueryBuilder $query, string $sql): string
505+
{
506+
$query->forUpdate();
507+
try {
508+
return $query->getSQL();
509+
} catch (DBALException) {
510+
return $sql;
511+
}
512+
}
496513
}

0 commit comments

Comments
 (0)
0