8000 [Messenger] Add mysql indexes back and work around deadlocks using soft-delete by nicolas-grekas · Pull Request #45888 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] Add mysql indexes back and work around deadlocks using soft-delete #45888

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[Messenger] Add mysql indexes back and work around deadlocks using so…
…ft-delete
  • Loading branch information
nicolas-grekas committed Mar 30, 2022
commit 12271a44cfe0b4ea7fdcbfc20a1ebf588aa54410
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;

use Doctrine\DBAL\Abstraction\Result as AbstractionResult;
use Doctrine\DBAL\Configuration;
use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Driver\Result as DriverResult;
Expand All @@ -24,11 +23,8 @@
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\SchemaConfig;
use Doctrine\DBAL\Schema\TableDiff;
use Doctrine\DBAL\Statement;
use Doctrine\DBAL\Types\Types;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\TransportException;
Expand Down Expand Up @@ -410,58 +406,4 @@ public function providePlatformSql(): iterable
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
];
}

/**
* @dataProvider setupIndicesProvider
*/
public function testSetupIndices(string $platformClass, array $expectedIndices)
{
$driverConnection = $this->createMock(DBALConnection::class);
$driverConnection->method('getConfiguration')->willReturn(new Configuration());

$schemaManager = $this->createMock(AbstractSchemaManager::class);
$schema = new Schema();
$expectedTable = $schema->createTable('messenger_messages');
$expectedTable->addColumn('id', Types::BIGINT);
$expectedTable->setPrimaryKey(['id']);
// Make sure columns for indices exists so addIndex() will not throw
foreach (array_unique(array_merge(...$expectedIndices)) as $columnName) {
$expectedTable->addColumn($columnName, Types::STRING);
}
foreach ($expectedIndices as $indexColumns) {
$expectedTable->addIndex($indexColumns);
}
$schemaManager->method('createSchema')->willReturn($schema);
if (method_exists(DBALConnection::class, 'createSchemaManager')) {
$driverConnection->method('createSchemaManager')->willReturn($schemaManager);
} else {
$driverConnection->method('getSchemaManager')->willReturn($schemaManager);
}

$platformMock = $this->createMock($platformClass);
$platformMock
->expects(self::once())
->method('getAlterTableSQL')
->with(self::callback(static function (TableDiff $tableDiff): bool {
return 0 === \count($tableDiff->addedIndexes) && 0 === \count($tableDiff->changedIndexes) && 0 === \count($tableDiff->removedIndexes);
}))
->willReturn([]);
$driverConnection->method('getDatabasePlatform')->willReturn($platformMock);

$connection = new Connection([], $driverConnection);
$connection->setup();
}

public function setupIndicesProvider(): iterable
{
yield 'MySQL' => [
MySQL57Platform::class,
[['delivered_at']],
];

yield 'Other platforms' => [
AbstractPlatform::class,
[['queue_name'], ['available_at'], ['delivered_at']],
];
}
}
25 changes: 20 additions & 5 deletions src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Driver\Exception as DriverException;
use Doctrine\DBAL\Driver\Result as DriverResult;
use Doctrine\DBAL\Exception;
use Doctrine\DBAL\Exception\TableNotFoundException;
Expand Down Expand Up @@ -157,6 +158,14 @@ public function send(string $body, array $headers, int $delay = 0): string

public function get(): ?array
{
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
try {
$this->driverConnection->delete($this->configuration['table_name'], ['delivered_at' => '9999-12-31']);
} catch (DriverException $e) {
// Ignore the exception
}
}

get:
$this->driverConnection->beginTransaction();
try {
Expand Down Expand Up @@ -224,6 +233,10 @@ public function get(): ?array
public function ack(string $id): bool
{
try {
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31'], ['id' => $id]) > 0;
}

return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
} catch (DBALException|Exception $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
Expand All @@ -233,6 +246,10 @@ public function ack(string $id): bool
public function reject(string $id): bool
{
try {
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31'], ['id' => $id]) > 0;
}

return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
} catch (DBALException|Exception $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
Expand Down Expand Up @@ -388,6 +405,7 @@ private function getSchema(): Schema
$table->addColumn('headers', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT)
->setNotnull(true);
$table->addColumn('queue_name', self::$useDeprecatedConstants ? Type::STRING : Types::STRING)
->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
->setNotnull(true);
$table->addColumn('created_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
->setNotnull(true);
Expand All @@ -396,11 +414,8 @@ private function getSchema(): Schema
$table->addColumn('delivered_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
->setNotnull(false);
$table->setPrimaryKey(['id']);
// No indices on queue_name and available_at on MySQL to prevent deadlock issues when running multiple consumers.
if (!$this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
$table->addIndex(['queue_name']);
$table->addIndex(['available_at']);
}
$table->addIndex(['queue_name']);
$table->addIndex(['available_at']);
$table->addIndex(['delivered_at']);

return $schema;
Expand Down
0