From 4ad810152199ee6975d314265a39db7502b121e3 Mon Sep 17 00:00:00 2001 From: gstapinato Date: Thu, 9 Feb 2023 10:46:40 -0300 Subject: [PATCH] [Messenger] Fix warning message on failed messenger show command --- .../Tests/Transport/ConnectionTest.php | 54 ++++++++++++++++++- .../Bridge/Doctrine/Transport/Connection.php | 44 +++++++++------ 2 files changed, 80 insertions(+), 18 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php index 7e576a5580dff..baf639cc62872 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php @@ -401,7 +401,7 @@ public function providePlatformSql(): iterable yield 'Oracle' => [ new OraclePlatform(), - 'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN(SELECT a.id FROM (SELECT m.* FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE', + 'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE', ]; } @@ -437,4 +437,56 @@ public function testConfigureSchemaTableExists() $table = $schema->getTable('messenger_messages'); $this->assertEmpty($table->getColumns(), 'The table was not overwritten'); } + + /** + * @dataProvider provideFindAllSqlGeneratedByPlatform + */ + public function testFindAllSqlGenerated(AbstractPlatform $platform, string $expectedSql) + { + $driverConnection = $this->createMock(DBALConnection::class); + $driverConnection->method('getDatabasePlatform')->willReturn($platform); + $driverConnection->method('createQueryBuilder')->willReturnCallback(function () use ($driverConnection) { + return new QueryBuilder($driverConnection); + }); + + if (interface_exists(DriverResult::class)) { + $result = $this->createMock(DriverResult::class); + $result->method('fetchAssociative')->willReturn(false); + + if (class_exists(Result::class)) { + $result = new Result($result, $driverConnection); + } + } else { + $result = $this->createMock(ResultStatement::class); + $result->method('fetch')->willReturn(false); + } + + $driverConnection + ->expects($this->once()) + ->method('executeQuery') + ->with($expectedSql) + ->willReturn($result) + ; + + $connection = new Connection([], $driverConnection); + $connection->findAll(50); + } + + public function provideFindAllSqlGeneratedByPlatform(): iterable + { + yield 'MySQL' => [ + new MySQL57Platform(), + 'SELECT m.* FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) LIMIT 50', + ]; + + yield 'SQL Server' => [ + new SQLServer2012Platform(), + 'SELECT m.* FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY (SELECT 0) OFFSET 0 ROWS FETCH NEXT 50 ROWS ONLY', + ]; + + yield 'Oracle' => [ + new OraclePlatform(), + 'SELECT a.* FROM (SELECT m.id AS "id", m.body AS "body", m.headers AS "headers", m.queue_name AS "queue_name", m.created_at AS "created_at", m.available_at AS "available_at", m.delivered_at AS "delivered_at" FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?)) a WHERE ROWNUM <= 50', + ]; + } } diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php index b760df6949092..99fb8b3641339 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php @@ -171,6 +171,10 @@ public function get(): ?array ->orderBy('available_at', 'ASC') ->setMaxResults(1); + if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) { + $query->select('m.id'); + } + // Append pessimistic write lock to FROM clause if db platform supports it $sql = $query->getSQL(); if (($fromPart = $query->getQueryPart('from')) && @@ -187,18 +191,9 @@ public function get(): ?array // Wrap the rownum query in a sub-query to allow writelocks without ORA-02014 error if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) { - $sql = str_replace('SELECT a.* FROM', 'SELECT a.id FROM', $sql); - - $wrappedQuery = $this->driverConnection->createQueryBuilder() - ->select( - 'w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", '. - 'w.created_at AS "created_at", w.available_at AS "available_at", '. - 'w.delivered_at AS "delivered_at"' - ) - ->from($this->configuration['table_name'], 'w') - ->where('w.id IN('.$sql.')'); - - $sql = $wrappedQuery->getSQL(); + $sql = $this->createQueryBuilder('w') + ->where('w.id IN ('.str_replace('SELECT a.* FROM', 'SELECT a.id FROM', $sql).')') + ->getSQL(); } // use SELECT ... FOR UPDATE to lock table @@ -287,7 +282,7 @@ public function setup(): void public function getMessageCount(): int { $queryBuilder = $this->createAvailableMessagesQueryBuilder() - ->select('COUNT(m.id) as message_count') + ->select('COUNT(m.id) AS message_count') ->setMaxResults(1); $stmt = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes()); @@ -298,6 +293,7 @@ public function getMessageCount(): int public function findAll(int $limit = null): array { $queryBuilder = $this->createAvailableMessagesQueryBuilder(); + if (null !== $limit) { $queryBuilder->setMaxResults($limit); } @@ -365,11 +361,25 @@ private function createAvailableMessagesQueryBuilder(): QueryBuilder ]); } - private function createQueryBuilder(): QueryBuilder + private function createQueryBuilder(string $alias = 'm'): QueryBuilder { - return $this->driverConnection->createQueryBuilder() - ->select('m.*') - ->from($this->configuration['table_name'], 'm'); + $queryBuilder = $this->driverConnection->createQueryBuilder() + ->from($this->configuration['table_name'], $alias); + + $alias .= '.'; + + if (!$this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) { + return $queryBuilder->select($alias.'*'); + } + + // Oracle databases use UPPER CASE on tables and column identifiers. + // Column alias is added to force the result to be lowercase even when the actual field is all caps. + + return $queryBuilder->select(str_replace(', ', ', '.$alias, + $alias.'id AS "id", body AS "body", headers AS "headers", queue_name AS "queue_name", '. + 'created_at AS "created_at", available_at AS "available_at", '. + 'delivered_at AS "delivered_at"' + )); } private function executeQuery(string $sql, array $parameters = [], array $types = [])