diff --git a/.github/workflows/check-subtree-split.yml b/.github/workflows/check-subtree-split.yml deleted file mode 100644 index 16be48b..0000000 --- a/.github/workflows/check-subtree-split.yml +++ /dev/null @@ -1,37 +0,0 @@ -name: Check subtree split - -on: - pull_request_target: - -jobs: - close-pull-request: - runs-on: ubuntu-latest - - steps: - - name: Close pull request - uses: actions/github-script@v6 - with: - script: | - if (context.repo.owner === "symfony") { - github.rest.issues.createComment({ - owner: "symfony", - repo: context.repo.repo, - issue_number: context.issue.number, - body: ` - Thanks for your Pull Request! We love contributions. - - However, you should instead open your PR on the main repository: - https://github.com/symfony/symfony - - This repository is what we call a "subtree split": a read-only subset of that main repository. - We're looking forward to your PR there! - ` - }); - - github.rest.pulls.update({ - owner: "symfony", - repo: context.repo.repo, - pull_number: context.issue.number, - state: "closed" - }); - } diff --git a/.github/workflows/close-pull-request.yml b/.github/workflows/close-pull-request.yml new file mode 100644 index 0000000..e55b478 --- /dev/null +++ b/.github/workflows/close-pull-request.yml @@ -0,0 +1,20 @@ +name: Close Pull Request + +on: + pull_request_target: + types: [opened] + +jobs: + run: + runs-on: ubuntu-latest + steps: + - uses: superbrothers/close-pull-request@v3 + with: + comment: | + Thanks for your Pull Request! We love contributions. + + However, you should instead open your PR on the main repository: + https://github.com/symfony/symfony + + This repository is what we call a "subtree split": a read-only subset of that main repository. + We're looking forward to your PR there! diff --git a/Tests/Transport/ConnectionTest.php b/Tests/Transport/ConnectionTest.php index 3544a03..a1a7b56 100644 --- a/Tests/Transport/ConnectionTest.php +++ b/Tests/Transport/ConnectionTest.php @@ -37,8 +37,8 @@ class ConnectionTest extends TestCase { public function testGetAMessageWillChangeItsStatus() { - $queryBuilder = $this->getQueryBuilderMock(); - $driverConnection = $this->getDBALConnectionMock(); + $queryBuilder = $this->getQueryBuilderStub(); + $driverConnection = $this->getDBALConnection(); $stmt = $this->getResultMock([ 'id' => 1, 'body' => '{"message":"Hi"}', @@ -73,8 +73,8 @@ public function testGetAMessageWillChangeItsStatus() public function testGetWithNoPendingMessageWillReturnNull() { - $queryBuilder = $this->getQueryBuilderMock(); - $driverConnection = $this->getDBALConnectionMock(); + $queryBuilder = $this->getQueryBuilderStub(); + $driverConnection = $this->getDBALConnection(true); $stmt = $this->getResultMock(false); $queryBuilder @@ -106,8 +106,8 @@ public function testGetWithSkipLockedWithForUpdateMethod() $this->markTestSkipped('This test is for when forUpdate method exists.'); } - $queryBuilder = $this->getQueryBuilderMock(); - $driverConnection = $this->getDBALConnectionMock(); + $queryBuilder = $this->getQueryBuilderStub(); + $driverConnection = $this->getDBALConnection(true); $stmt = $this->getResultMock(false); $queryBuilder @@ -130,10 +130,12 @@ public function testGetWithSkipLockedWithForUpdateMethod() ->method('update'); $driverConnection ->method('executeQuery') - ->with($this->callback(function ($sql) { - return str_contains($sql, 'SKIP LOCKED'); - })) - ->willReturn($stmt); + ->willReturnCallback(function (string $sql) use ($stmt) { + $this->assertStringContainsString('SKIP LOCKED', $sql); + + return $stmt; + }) + ; $connection = new Connection(['skip_locked' => true], $driverConnection); $doctrineEnvelope = $connection->get(); @@ -146,8 +148,8 @@ public function testGetWithSkipLockedWithoutForUpdateMethod() $this->markTestSkipped('This test is for when forUpdate method does not exist.'); } - $queryBuilder = $this->getQueryBuilderMock(); - $driverConnection = $this->getDBALConnectionMock(); + $queryBuilder = $this->getQueryBuilderStub(); + $driverConnection = $this->getDBALConnection(); $stmt = $this->getResultMock(false); $queryBuilder @@ -179,7 +181,7 @@ public function testGetWithSkipLockedWithoutForUpdateMethod() public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() { $this->expectException(TransportException::class); - $driverConnection = $this->getDBALConnectionMock(); + $driverConnection = $this->getDBALConnection(); $driverConnection->method('delete')->willThrowException($this->createStub(DBALException::class)); $connection = new Connection([], $driverConnection); @@ -189,7 +191,7 @@ public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() public function testItThrowsATransportExceptionIfItCannotRejectMessage() { $this->expectException(TransportException::class); - $driverConnection = $this->getDBALConnectionMock(); + $driverConnection = $this->getDBALConnection(); $driverConnection->method('delete')->willThrowException($this->createStub(DBALException::class)); $connection = new Connection([], $driverConnection); @@ -199,7 +201,7 @@ public function testItThrowsATransportExceptionIfItCannotRejectMessage() public function testSend() { $queryBuilder = $this->getQueryBuilderMock(); - $driverConnection = $this->getDBALConnectionMock(); + $driverConnection = $this->getDBALConnection(true); $driverConnection->expects($this->once()) ->method('createQueryBuilder') @@ -248,7 +250,7 @@ public function testSend() public function testSendLastInsertIdReturnsInteger() { $queryBuilder = $this->getQueryBuilderMock(); - $driverConnection = $this->getDBALConnectionMock(); + $driverConnection = $this->getDBALConnection(true); $driverConnection->expects($this->once()) ->method('createQueryBuilder') @@ -294,21 +296,21 @@ public function testSendLastInsertIdReturnsInteger() self::assertSame('1', $id); } - private function getDBALConnectionMock() + private function getDBALConnection(bool $mock = false) { - $driverConnection = $this->createMock(DBALConnection::class); - $platform = $this->createMock(AbstractPlatform::class); + $driverConnection = $mock ? $this->createMock(DBALConnection::class) : $this->createStub(DBALConnection::class); + $platform = $this->createStub(AbstractPlatform::class); if (!method_exists(QueryBuilder::class, 'forUpdate')) { $platform->method('getWriteLockSQL')->willReturn('FOR UPDATE SKIP LOCKED'); } - $configuration = $this->createMock(\Doctrine\DBAL\Configuration::class); + $configuration = $this->createStub(\Doctrine\DBAL\Configuration::class); $driverConnection->method('getDatabasePlatform')->willReturn($platform); $driverConnection->method('getConfiguration')->willReturn($configuration); - $schemaManager = $this->createMock(AbstractSchemaManager::class); - $schemaConfig = $this->createMock(SchemaConfig::class); + $schemaManager = $this->createStub(AbstractSchemaManager::class); + $schemaConfig = $this->createStub(SchemaConfig::class); $schemaConfig->method('getMaxIdentifierLength')->willReturn(63); $schemaConfig->method('getDefaultTableOptions')->willReturn([]); $schemaManager->method('createSchemaConfig')->willReturn($schemaConfig); @@ -339,6 +341,24 @@ private function getQueryBuilderMock() return $queryBuilder; } + private function getQueryBuilderStub() + { + $queryBuilder = $this->createStub(QueryBuilder::class); + + $queryBuilder->method('select')->willReturn($queryBuilder); + $queryBuilder->method('update')->willReturn($queryBuilder); + $queryBuilder->method('from')->willReturn($queryBuilder); + $queryBuilder->method('set')->willReturn($queryBuilder); + $queryBuilder->method('where')->willReturn($queryBuilder); + $queryBuilder->method('andWhere')->willReturn($queryBuilder); + $queryBuilder->method('orderBy')->willReturn($queryBuilder); + $queryBuilder->method('setMaxResults')->willReturn($queryBuilder); + $queryBuilder->method('setParameter')->willReturn($queryBuilder); + $queryBuilder->method('setParameters')->willReturn($queryBuilder); + + return $queryBuilder; + } + private function getResultMock($expectedResult) { $stmt = $this->createMock(class_exists(Result::class) ? Result::class : ResultStatement::class); @@ -452,8 +472,8 @@ public function testItThrowsAnExceptionIfAnExtraOptionsInDefinedInDSN() public function testFind() { - $queryBuilder = $this->getQueryBuilderMock(); - $driverConnection = $this->getDBALConnectionMock(); + $queryBuilder = $this->getQueryBuilderStub(); + $driverConnection = $this->getDBALConnection(); $id = 1; $stmt = $this->getResultMock([ 'id' => $id, @@ -487,8 +507,8 @@ public function testFind() public function testFindAll() { - $queryBuilder = $this->getQueryBuilderMock(); - $driverConnection = $this->getDBALConnectionMock(); + $queryBuilder = $this->getQueryBuilderStub(); + $driverConnection = $this->getDBALConnection(); $message1 = [ 'id' => 1, 'body' => '{"message":"Hi"}', @@ -546,10 +566,10 @@ public function testGeneratedSql(AbstractPlatform $platform, string $expectedSql $driverConnection->method('createQueryBuilder')->willReturnCallback(fn () => new QueryBuilder($driverConnection)); if (class_exists(Result::class)) { - $result = $this->createMock(Result::class); + $result = $this->createStub(Result::class); $result->method('fetchAssociative')->willReturn(false); } else { - $result = $this->createMock(ResultStatement::class); + $result = $this->createStub(ResultStatement::class); $result->method('fetch')->willReturn(false); } @@ -584,7 +604,7 @@ class_exists(MySQLPlatform::class) ? new MySQLPlatform() : new MySQL57Platform() yield 'SQL Server' => [ class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::class) ? new SQLServerPlatform() : new SQLServer2012Platform(), - sprintf('SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK%s) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ', method_exists(QueryBuilder::class, 'forUpdate') ? ', READPAST' : ''), + \sprintf('SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK%s) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ', method_exists(QueryBuilder::class, 'forUpdate') ? ', READPAST' : ''), ]; if (!class_exists(MySQL57Platform::class)) { @@ -597,25 +617,41 @@ class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::c // DBAL < 4 yield 'Oracle' => [ new OraclePlatform(), - sprintf('SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE%s', method_exists(QueryBuilder::class, 'forUpdate') ? ' SKIP LOCKED' : ''), + \sprintf('SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE%s', method_exists(QueryBuilder::class, 'forUpdate') ? ' SKIP LOCKED' : ''), ]; } } public function testConfigureSchema() { - $driverConnection = $this->getDBALConnectionMock(); + $driverConnection = $this->getDBALConnection(); $schema = new Schema(); $connection = new Connection(['table_name' => 'queue_table'], $driverConnection); $connection->configureSchema($schema, $driverConnection, fn () => true); $this->assertTrue($schema->hasTable('queue_table')); + + // Ensure the covering index for the SELECT query exists + $table = $schema->getTable('queue_table'); + $hasCoveringIndex = false; + foreach ($table->getIndexes() as $index) { + // Doctrine DBAL 4+: use getIndexedColumns(); fallback to getColumns() for older versions + $columns = method_exists($index, 'getIndexedColumns') + ? array_map(static fn ($ic) => $ic->getColumnName()->toString(), $index->getIndexedColumns()) + : $index->getColumns(); + + if ($columns === ['queue_name', 'available_at', 'delivered_at', 'id']) { + $hasCoveringIndex = true; + break; + } + } + $this->assertTrue($hasCoveringIndex, 'Expected covering index on [queue_name, available_at, delivered_at, id] not found'); } public function testConfigureSchemaDifferentDbalConnection() { - $driverConnection = $this->getDBALConnectionMock(); - $driverConnection2 = $this->getDBALConnectionMock(); + $driverConnection = $this->getDBALConnection(); + $driverConnection2 = $this->getDBALConnection(); $schema = new Schema(); $connection = new Connection([], $driverConnection); @@ -625,7 +661,7 @@ public function testConfigureSchemaDifferentDbalConnection() public function testConfigureSchemaTableExists() { - $driverConnection = $this->getDBALConnectionMock(); + $driverConnection = $this->getDBALConnection(); $schema = new Schema(); $schema->createTable('messenger_messages'); @@ -647,10 +683,10 @@ public function testFindAllSqlGenerated(AbstractPlatform $platform, string $expe }); if (class_exists(Result::class)) { - $result = $this->createMock(Result::class); + $result = $this->createStub(Result::class); $result->method('fetchAllAssociative')->willReturn([]); } else { - $result = $this->createMock(ResultStatement::class); + $result = $this->createStub(ResultStatement::class); $result->method('fetchAll')->willReturn([]); } @@ -665,7 +701,7 @@ public function testFindAllSqlGenerated(AbstractPlatform $platform, string $expe $connection->findAll(50); } - public function provideFindAllSqlGeneratedByPlatform(): iterable + public static function provideFindAllSqlGeneratedByPlatform(): iterable { yield 'MySQL' => [ class_exists(MySQLPlatform::class) ? new MySQLPlatform() : new MySQL57Platform(), @@ -698,4 +734,21 @@ class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::c ]; } } + + public function testConfigureSchemaOracleSequenceNameSuffixed() + { + $driverConnection = $this->createStub(DBALConnection::class); + $driverConnection->method('getDatabasePlatform')->willReturn(new OraclePlatform()); + $schema = new Schema(); + + $connection = new Connection(['table_name' => 'messenger_messages'], $driverConnection); + $connection->configureSchema($schema, $driverConnection, fn () => true); + + $expectedSuffix = '_seq'; + $sequences = $schema->getSequences(); + $this->assertCount(1, $sequences); + $sequence = array_pop($sequences); + $sequenceNameSuffix = substr($sequence->getName(), -\strlen($expectedSuffix)); + $this->assertSame($expectedSuffix, $sequenceNameSuffix); + } } diff --git a/Tests/Transport/DoctrinePostgreSqlFilterIntegrationTest.php b/Tests/Transport/DoctrinePostgreSqlFilterIntegrationTest.php new file mode 100644 index 0000000..9a4738b --- /dev/null +++ b/Tests/Transport/DoctrinePostgreSqlFilterIntegrationTest.php @@ -0,0 +1,119 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport; + +use Doctrine\DBAL\Configuration; +use Doctrine\DBAL\Connection; +use Doctrine\DBAL\DriverManager; +use Doctrine\DBAL\Schema\Column; +use Doctrine\DBAL\Schema\DefaultSchemaManagerFactory; +use Doctrine\DBAL\Schema\Sequence; +use Doctrine\DBAL\Schema\Table; +use Doctrine\DBAL\Tools\DsnParser; +use Doctrine\DBAL\Types\Type; +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection; + +/** + * This test checks on a postgres connection whether the doctrine asset filter works as expected. + * + * @requires extension pdo_pgsql + * + * @group integration + */ +class DoctrinePostgreSqlFilterIntegrationTest extends TestCase +{ + private Connection $driverConnection; + + protected function setUp(): void + { + if (!$host = getenv('POSTGRES_HOST')) { + $this->markTestSkipped('Missing POSTGRES_HOST env variable'); + } + + $url = "pdo-pgsql://postgres:password@$host"; + $params = (new DsnParser())->parse($url); + $config = new Configuration(); + if (class_exists(DefaultSchemaManagerFactory::class)) { + $config->setSchemaManagerFactory(new DefaultSchemaManagerFactory()); + } + + $this->driverConnection = DriverManager::getConnection($params, $config); + + $this->createAssets(); + } + + protected function tearDown(): void + { + $this->removeAssets(); + + $this->driverConnection->close(); + } + + public function testFilterAssets() + { + $schemaManager = $this->driverConnection->createSchemaManager(); + + $this->assertFalse($schemaManager->tablesExist(['queue_table'])); + $this->assertTrue($schemaManager->tablesExist(['app_table'])); + $this->assertTrue($this->hasSequence('app_table_id')); + + $connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $this->driverConnection); + $connection->setup(); + + $schemaManager = $this->driverConnection->createSchemaManager(); + + $this->assertTrue($schemaManager->tablesExist(['queue_table'])); + $this->assertTrue($schemaManager->tablesExist(['app_table'])); + $this->assertTrue($this->hasSequence('app_table_id')); + } + + private function createAssets(): void + { + $this->removeAssets(); + + $schemaManager = $this->driverConnection->createSchemaManager(); + $schemaManager->createTable(new Table('app_table', [new Column('id', Type::getType('integer'))])); + $schemaManager->createSequence(new Sequence('app_table_id')); + } + + private function removeAssets(): void + { + $schemaManager = $this->driverConnection->createSchemaManager(); + + if ($schemaManager->tablesExist(['queue_table'])) { + $schemaManager->dropTable('queue_table'); + } + + if ($schemaManager->tablesExist(['app_table'])) { + $schemaManager->dropTable('app_table'); + } + + if ($this->hasSequence('app_table_id')) { + $schemaManager->dropSequence('app_table_id'); + } + } + + private function hasSequence(string $name): bool + { + $schemaManager = $this->driverConnection->createSchemaManager(); + + $sequences = $schemaManager->listSequences(); + foreach ($sequences as $sequence) { + if ($sequence->getName() === $name) { + return true; + } + } + + return false; + } +} diff --git a/Tests/Transport/DoctrineReceiverTest.php b/Tests/Transport/DoctrineReceiverTest.php index 36ee145..d5dc4ea 100644 --- a/Tests/Transport/DoctrineReceiverTest.php +++ b/Tests/Transport/DoctrineReceiverTest.php @@ -28,6 +28,8 @@ use Symfony\Component\Messenger\Transport\Serialization\Serializer; use Symfony\Component\Serializer as SerializerComponent; use Symfony\Component\Serializer\Encoder\JsonEncoder; +use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer; +use Symfony\Component\Serializer\Normalizer\DateTimeNormalizer; use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; class DoctrineReceiverTest extends TestCase @@ -37,7 +39,7 @@ public function testItReturnsTheDecodedMessageToTheHandler() $serializer = $this->createSerializer(); $doctrineEnvelope = $this->createDoctrineEnvelope(); - $connection = $this->createMock(Connection::class); + $connection = $this->createStub(Connection::class); $connection->method('get')->willReturn($doctrineEnvelope); $receiver = new DoctrineReceiver($connection, $serializer); @@ -45,7 +47,7 @@ public function testItReturnsTheDecodedMessageToTheHandler() $this->assertCount(1, $actualEnvelopes); /** @var Envelope $actualEnvelope */ $actualEnvelope = $actualEnvelopes[0]; - $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage()); /** @var DoctrineReceivedStamp $doctrineReceivedStamp */ $doctrineReceivedStamp = $actualEnvelope->last(DoctrineReceivedStamp::class); @@ -61,7 +63,7 @@ public function testItReturnsTheDecodedMessageToTheHandler() public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException() { $this->expectException(MessageDecodingFailedException::class); - $serializer = $this->createMock(PhpSerializer::class); + $serializer = $this->createStub(PhpSerializer::class); $serializer->method('decode')->willThrowException(new MessageDecodingFailedException()); $doctrineEnvelop = $this->createDoctrineEnvelope(); @@ -76,7 +78,7 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException() public function testOccursRetryableExceptionFromConnection() { $serializer = $this->createSerializer(); - $connection = $this->createMock(Connection::class); + $connection = $this->createStub(Connection::class); $driverException = class_exists(Exception::class) ? Exception::new(new \PDOException('Deadlock', 40001)) : new PDOException(new \PDOException('Deadlock', 40001)); if (!class_exists(Version::class)) { // This is doctrine/dbal 3.x @@ -100,13 +102,30 @@ public function testOccursRetryableExceptionFromConnection() $receiver->get(); } + public function testGetReplacesExistingTransportMessageIdStamps() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope = $this->createRetriedDoctrineEnvelope(); + $connection = $this->createStub(Connection::class); + $connection->method('get')->willReturn($doctrineEnvelope); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelopes = $receiver->get(); + /** @var Envelope $actualEnvelope */ + $actualEnvelope = $actualEnvelopes[0]; + $messageIdStamps = $actualEnvelope->all(TransportMessageIdStamp::class); + + $this->assertCount(1, $messageIdStamps); + } + public function testAll() { $serializer = $this->createSerializer(); $doctrineEnvelope1 = $this->createDoctrineEnvelope(); $doctrineEnvelope2 = $this->createDoctrineEnvelope(); - $connection = $this->createMock(Connection::class); + $connection = $this->createStub(Connection::class); $connection->method('findAll')->with(50)->willReturn([$doctrineEnvelope1, $doctrineEnvelope2]); $receiver = new DoctrineReceiver($connection, $serializer); @@ -115,12 +134,30 @@ public function testAll() $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); } + public function testAllReplacesExistingTransportMessageIdStamps() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope1 = $this->createRetriedDoctrineEnvelope(); + $doctrineEnvelope2 = $this->createRetriedDoctrineEnvelope(); + $connection = $this->createStub(Connection::class); + $connection->method('findAll')->willReturn([$doctrineEnvelope1, $doctrineEnvelope2]); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelopes = $receiver->all(); + foreach ($actualEnvelopes as $actualEnvelope) { + $messageIdStamps = $actualEnvelope->all(TransportMessageIdStamp::class); + + $this->assertCount(1, $messageIdStamps); + } + } + public function testFind() { $serializer = $this->createSerializer(); $doctrineEnvelope = $this->createDoctrineEnvelope(); - $connection = $this->createMock(Connection::class); + $connection = $this->createStub(Connection::class); $connection->method('find')->with(10)->willReturn($doctrineEnvelope); $receiver = new DoctrineReceiver($connection, $serializer); @@ -128,6 +165,21 @@ public function testFind() $this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage()); } + public function testFindReplacesExistingTransportMessageIdStamps() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope = $this->createRetriedDoctrineEnvelope(); + $connection = $this->createStub(Connection::class); + $connection->method('find')->with(3)->willReturn($doctrineEnvelope); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelope = $receiver->find(3); + $messageIdStamps = $actualEnvelope->all(TransportMessageIdStamp::class); + + $this->assertCount(1, $messageIdStamps); + } + public function testAck() { $serializer = $this->createSerializer(); @@ -195,7 +247,7 @@ public function testAckThrowsRetryableExceptionAndRetriesFail() ->with('1') ->willThrowException($deadlockException); - self::expectException(TransportException::class); + $this->expectException(TransportException::class); $receiver->ack($envelope); } @@ -215,7 +267,7 @@ public function testAckThrowsException() ->with('1') ->willThrowException($exception); - self::expectException($exception::class); + $this->expectException($exception::class); $receiver->ack($envelope); } @@ -286,7 +338,7 @@ public function testRejectThrowsRetryableExceptionAndRetriesFail() ->with('1') ->willThrowException($deadlockException); - self::expectException(TransportException::class); + $this->expectException(TransportException::class); $receiver->reject($envelope); } @@ -306,7 +358,7 @@ public function testRejectThrowsException() ->with('1') ->willThrowException($exception); - self::expectException($exception::class); + $this->expectException($exception::class); $receiver->reject($envelope); } @@ -321,12 +373,27 @@ private function createDoctrineEnvelope(): array ]; } + private function createRetriedDoctrineEnvelope(): array + { + return [ + 'id' => 3, + 'body' => '{"message": "Hi"}', + 'headers' => [ + 'type' => DummyMessage::class, + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\BusNameStamp' => '[{"busName":"messenger.bus.default"}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\TransportMessageIdStamp' => '[{"id":1},{"id":2}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\ErrorDetailsStamp' => '[{"exceptionClass":"Symfony\\\\Component\\\\Messenger\\\\Exception\\\\RecoverableMessageHandlingException","exceptionCode":0,"exceptionMessage":"","flattenException":null}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\DelayStamp' => '[{"delay":1000},{"delay":1000}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\RedeliveryStamp' => '[{"retryCount":1,"redeliveredAt":"2025-01-05T13:58:25+00:00"},{"retryCount":2,"redeliveredAt":"2025-01-05T13:59:26+00:00"}]', + 'Content-Type' => 'application/json', + ], + ]; + } + private function createSerializer(): Serializer { - $serializer = new Serializer( - new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + return new Serializer( + new SerializerComponent\Serializer([new DateTimeNormalizer(), new ArrayDenormalizer(), new ObjectNormalizer()], ['json' => new JsonEncoder()]) ); - - return $serializer; } } diff --git a/Tests/Transport/DoctrineSenderTest.php b/Tests/Transport/DoctrineSenderTest.php index 8505e3d..02bce51 100644 --- a/Tests/Transport/DoctrineSenderTest.php +++ b/Tests/Transport/DoctrineSenderTest.php @@ -30,8 +30,8 @@ public function testSend() $connection = $this->createMock(Connection::class); $connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'])->willReturn('15'); - $serializer = $this->createMock(SerializerInterface::class); - $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + $serializer = $this->createStub(SerializerInterface::class); + $serializer->method('encode')->with($envelope)->willReturn($encoded); $sender = new DoctrineSender($connection, $serializer); $actualEnvelope = $sender->send($envelope); @@ -50,8 +50,8 @@ public function testSendWithDelay() $connection = $this->createMock(Connection::class); $connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 500); - $serializer = $this->createMock(SerializerInterface::class); - $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + $serializer = $this->createStub(SerializerInterface::class); + $serializer->method('encode')->with($envelope)->willReturn($encoded); $sender = new DoctrineSender($connection, $serializer); $sender->send($envelope); diff --git a/Tests/Transport/DoctrineTransportFactoryTest.php b/Tests/Transport/DoctrineTransportFactoryTest.php index 65f9c68..499569b 100644 --- a/Tests/Transport/DoctrineTransportFactoryTest.php +++ b/Tests/Transport/DoctrineTransportFactoryTest.php @@ -32,7 +32,7 @@ class DoctrineTransportFactoryTest extends TestCase public function testSupports() { $factory = new DoctrineTransportFactory( - $this->createMock(ConnectionRegistry::class) + $this->createStub(ConnectionRegistry::class) ); $this->assertTrue($factory->supports('doctrine://default', [])); @@ -41,10 +41,10 @@ public function testSupports() public function testCreateTransport() { - $driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class); - $schemaManager = $this->createMock(AbstractSchemaManager::class); - $schemaConfig = $this->createMock(SchemaConfig::class); - $platform = $this->createMock(AbstractPlatform::class); + $driverConnection = $this->createStub(\Doctrine\DBAL\Connection::class); + $schemaManager = $this->createStub(AbstractSchemaManager::class); + $schemaConfig = $this->createStub(SchemaConfig::class); + $platform = $this->createStub(AbstractPlatform::class); $schemaManager->method('createSchemaConfig')->willReturn($schemaConfig); $driverConnection->method( method_exists(\Doctrine\DBAL\Connection::class, 'createSchemaManager') @@ -59,7 +59,7 @@ public function testCreateTransport() ->willReturn($driverConnection); $factory = new DoctrineTransportFactory($registry); - $serializer = $this->createMock(SerializerInterface::class); + $serializer = $this->createStub(SerializerInterface::class); $this->assertEquals( new DoctrineTransport(new Connection(PostgreSqlConnection::buildConfiguration('doctrine://default'), $driverConnection), $serializer), @@ -69,10 +69,10 @@ public function testCreateTransport() public function testCreateTransportNotifyWithPostgreSQLPlatform() { - $driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class); - $schemaManager = $this->createMock(AbstractSchemaManager::class); - $schemaConfig = $this->createMock(SchemaConfig::class); - $platform = $this->createMock(PostgreSQLPlatform::class); + $driverConnection = $this->createStub(\Doctrine\DBAL\Connection::class); + $schemaManager = $this->createStub(AbstractSchemaManager::class); + $schemaConfig = $this->createStub(SchemaConfig::class); + $platform = $this->createStub(PostgreSQLPlatform::class); $schemaManager->method('createSchemaConfig')->willReturn($schemaConfig); $driverConnection->method( method_exists(\Doctrine\DBAL\Connection::class, 'createSchemaManager') @@ -88,7 +88,7 @@ public function testCreateTransportNotifyWithPostgreSQLPlatform() ->willReturn($driverConnection); $factory = new DoctrineTransportFactory($registry); - $serializer = $this->createMock(SerializerInterface::class); + $serializer = $this->createStub(SerializerInterface::class); $this->assertEquals( new DoctrineTransport(new PostgreSqlConnection(PostgreSqlConnection::buildConfiguration('doctrine://default'), $driverConnection), $serializer), @@ -108,6 +108,6 @@ public function testCreateTransportMustThrowAnExceptionIfManagerIsNotFound() }); $factory = new DoctrineTransportFactory($registry); - $factory->createTransport('doctrine://default', [], $this->createMock(SerializerInterface::class)); + $factory->createTransport('doctrine://default', [], $this->createStub(SerializerInterface::class)); } } diff --git a/Tests/Transport/DoctrineTransportTest.php b/Tests/Transport/DoctrineTransportTest.php index 40285e2..654465e 100644 --- a/Tests/Transport/DoctrineTransportTest.php +++ b/Tests/Transport/DoctrineTransportTest.php @@ -33,8 +33,8 @@ public function testItIsATransport() public function testReceivesMessages() { $transport = $this->getTransport( - $serializer = $this->createMock(SerializerInterface::class), - $connection = $this->createMock(Connection::class) + $serializer = $this->createStub(SerializerInterface::class), + $connection = $this->createStub(Connection::class) ); $decodedMessage = new DummyMessage('Decoded.'); @@ -60,7 +60,7 @@ public function testConfigureSchema() ); $schema = new Schema(); - $dbalConnection = $this->createMock(DbalConnection::class); + $dbalConnection = $this->createStub(DbalConnection::class); $connection->expects($this->once()) ->method('configureSchema') @@ -71,8 +71,8 @@ public function testConfigureSchema() private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null): DoctrineTransport { - $serializer ??= $this->createMock(SerializerInterface::class); - $connection ??= $this->createMock(Connection::class); + $serializer ??= $this->createStub(SerializerInterface::class); + $connection ??= $this->createStub(Connection::class); return new DoctrineTransport($connection, $serializer); } diff --git a/Tests/Transport/PostgreSqlConnectionTest.php b/Tests/Transport/PostgreSqlConnectionTest.php index 71dfcc3..d09c54e 100644 --- a/Tests/Transport/PostgreSqlConnectionTest.php +++ b/Tests/Transport/PostgreSqlConnectionTest.php @@ -11,8 +11,8 @@ namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport; -use Doctrine\DBAL\Cache\ArrayResult; use Doctrine\DBAL\Cache\ArrayStatement; +use Doctrine\DBAL\Driver\Result as DriverResult; use Doctrine\DBAL\Platforms\PostgreSQLPlatform; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Result; @@ -30,7 +30,7 @@ public function testSerialize() $this->expectException(\BadMethodCallException::class); $this->expectExceptionMessage('Cannot serialize '.PostgreSqlConnection::class); - $driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class); + $driverConnection = $this->createStub(\Doctrine\DBAL\Connection::class); $driverConnection->method('executeStatement')->willReturn(1); $connection = new PostgreSqlConnection([], $driverConnection); @@ -42,11 +42,11 @@ public function testUnserialize() $this->expectException(\BadMethodCallException::class); $this->expectExceptionMessage('Cannot unserialize '.PostgreSqlConnection::class); - $driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class); + $driverConnection = $this->createStub(\Doctrine\DBAL\Connection::class); $driverConnection->method('executeStatement')->willReturn(1); $connection = new PostgreSqlConnection([], $driverConnection); - $connection->__wakeup(); + $connection->__unserialize([]); } public function testListenOnConnection() @@ -55,16 +55,14 @@ public function testListenOnConnection() $driverConnection->method('executeStatement')->willReturn(1); $driverConnection - ->expects(self::any()) ->method('getDatabasePlatform') ->willReturn(new PostgreSQLPlatform()); $driverConnection - ->expects(self::any()) ->method('createQueryBuilder') ->willReturn(new QueryBuilder($driverConnection)); - $wrappedConnection = new class() { + $wrappedConnection = new class { private int $notifyCalls = 0; public function pgsqlGetNotify() @@ -74,6 +72,13 @@ public function pgsqlGetNotify() return false; } + public function getNotify() + { + ++$this->notifyCalls; + + return false; + } + public function countNotifyCalls() { return $this->notifyCalls; @@ -88,7 +93,6 @@ public function countNotifyCalls() ->willReturn($wrappedConnection); $driverConnection - ->expects(self::any()) ->method('executeQuery') ->willReturn(new ArrayStatement([])); } else { @@ -98,10 +102,12 @@ public function countNotifyCalls() ->method('getNativeConnection') ->willReturn($wrappedConnection); + $driverResult = $this->createMock(DriverResult::class); + $driverResult->method('fetchAssociative') + ->willReturn(false); $driverConnection - ->expects(self::any()) ->method('executeQuery') - ->willReturn(new Result(new ArrayResult([]), $driverConnection)); + ->willReturn(new Result($driverResult, $driverConnection)); } $connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $driverConnection); @@ -109,43 +115,18 @@ public function countNotifyCalls() $connection->get(); $connection->get(); - $this->assertSame(2, $wrappedConnection->countNotifyCalls()); - } - - public function testGetExtraSetupSql() - { - $driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class); - $driverConnection->method('executeStatement')->willReturn(1); - $connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $driverConnection); - - $table = new Table('queue_table'); - $table->addOption('_symfony_messenger_table_name', 'queue_table'); - $sql = implode("\n", $connection->getExtraSetupSqlForTable($table)); - - $this->assertStringContainsString('CREATE TRIGGER', $sql); - - // We MUST NOT use transaction, that will mess with the PDO in PHP 8 - $this->assertStringNotContainsString('BEGIN;', $sql); - $this->assertStringNotContainsString('COMMIT;', $sql); - } + $this->assertTrue($connection->isListening()); - public function testTransformTableNameWithSchemaToValidProcedureName() - { - $driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class); - $driverConnection->method('executeStatement')->willReturn(1); - $connection = new PostgreSqlConnection(['table_name' => 'schema.queue_table'], $driverConnection); + $this->assertSame(2, $wrappedConnection->countNotifyCalls()); - $table = new Table('schema.queue_table'); - $table->addOption('_symfony_messenger_table_name', 'schema.queue_table'); - $sql = implode("\n", $connection->getExtraSetupSqlForTable($table)); + $connection->__destruct(); - $this->assertStringContainsString('CREATE OR REPLACE FUNCTION schema.notify_queue_table', $sql); - $this->assertStringContainsString('FOR EACH ROW EXECUTE PROCEDURE schema.notify_queue_table()', $sql); + $this->assertFalse($connection->isListening()); } public function testGetExtraSetupSqlWrongTable() { - $driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class); + $driverConnection = $this->createStub(\Doctrine\DBAL\Connection::class); $driverConnection->method('executeStatement')->willReturn(1); $connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $driverConnection); @@ -153,4 +134,12 @@ public function testGetExtraSetupSqlWrongTable() // don't set the _symfony_messenger_table_name option $this->assertSame([], $connection->getExtraSetupSqlForTable($table)); } + + public function testIsListeningReturnsFalseWhenGetHasNotBeenCalled() + { + $driverConnection = $this->createStub(\Doctrine\DBAL\Connection::class); + $connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $driverConnection); + + $this->assertFalse($connection->isListening()); + } } diff --git a/Transport/Connection.php b/Transport/Connection.php index 34b4715..770284d 100644 --- a/Transport/Connection.php +++ b/Transport/Connection.php @@ -13,24 +13,25 @@ use Doctrine\DBAL\Abstraction\Result as AbstractionResult; use Doctrine\DBAL\Connection as DBALConnection; -use Doctrine\DBAL\Driver\Exception as DriverException; use Doctrine\DBAL\Driver\ResultStatement; use Doctrine\DBAL\Exception as DBALException; use Doctrine\DBAL\Exception\TableNotFoundException; use Doctrine\DBAL\LockMode; -use Doctrine\DBAL\Platforms\MySQLPlatform; use Doctrine\DBAL\Platforms\OraclePlatform; use Doctrine\DBAL\Platforms\PostgreSQLPlatform; use Doctrine\DBAL\Query\ForUpdate\ConflictResolutionMode; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Result; +use Doctrine\DBAL\Schema\AbstractAsset; use Doctrine\DBAL\Schema\AbstractSchemaManager; use Doctrine\DBAL\Schema\Comparator; +use Doctrine\DBAL\Schema\ComparatorConfig; use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Schema\SchemaDiff; use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer; use Doctrine\DBAL\Schema\Table; use Doctrine\DBAL\Types\Types; +use Satag\DoctrineFirebirdDriver\Platforms\FirebirdPlatform; use Symfony\Component\Messenger\Exception\InvalidArgumentException; use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Contracts\Service\ResetInterface; @@ -52,6 +53,8 @@ class Connection implements ResetInterface 'auto_setup' => true, ]; + private const ORACLE_SEQUENCES_SUFFIX = '_seq'; + /** * Configuration of the connection. * @@ -107,13 +110,13 @@ public static function buildConfiguration(#[\SensitiveParameter] string $dsn, ar // check for extra keys in options $optionsExtraKeys = array_diff(array_keys($options), array_keys(static::DEFAULT_OPTIONS)); if (0 < \count($optionsExtraKeys)) { - throw new InvalidArgumentException(sprintf('Unknown option found: [%s]. Allowed options are [%s].', implode(', ', $optionsExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS)))); + throw new InvalidArgumentException(\sprintf('Unknown option found: [%s]. Allowed options are [%s].', implode(', ', $optionsExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS)))); } // check for extra keys in options $queryExtraKeys = array_diff(array_keys($query), array_keys(static::DEFAULT_OPTIONS)); if (0 < \count($queryExtraKeys)) { - throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s].', implode(', ', $queryExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS)))); + throw new InvalidArgumentException(\sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s].', implode(', ', $queryExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS)))); } return $configuration; @@ -129,7 +132,7 @@ public static function buildConfiguration(#[\SensitiveParameter] string $dsn, ar public function send(string $body, array $headers, int $delay = 0): string { $now = new \DateTimeImmutable('UTC'); - $availableAt = $now->modify(sprintf('%+d seconds', $delay / 1000)); + $availableAt = $now->modify(\sprintf('%+d seconds', $delay / 1000)); $queryBuilder = $this->driverConnection->createQueryBuilder() ->insert($this->configuration['table_name']) @@ -158,18 +161,6 @@ public function send(string $body, array $headers, int $delay = 0): string public function get(): ?array { - if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) { - try { - $this->driverConnection->delete($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59']); - } catch (DriverException $e) { - // Ignore the exception - } catch (TableNotFoundException $e) { - if ($this->autoSetup) { - $this->setup(); - } - } - } - get: $this->driverConnection->beginTransaction(); try { @@ -211,8 +202,8 @@ public function get(): ?array } elseif (preg_match('/FROM (.+) WHERE/', (string) $sql, $matches)) { $fromClause = $matches[1]; $sql = str_replace( - sprintf('FROM %s WHERE', $fromClause), - sprintf('FROM %s WHERE', $this->driverConnection->getDatabasePlatform()->appendLockHint($fromClause, LockMode::PESSIMISTIC_WRITE)), + \sprintf('FROM %s WHERE', $fromClause), + \sprintf('FROM %s WHERE', $this->driverConnection->getDatabasePlatform()->appendLockHint($fromClause, LockMode::PESSIMISTIC_WRITE)), $sql ); } @@ -271,10 +262,6 @@ public function get(): ?array public function ack(string $id): bool { try { - if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) { - return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0; - } - return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0; } catch (DBALException $exception) { throw new TransportException($exception->getMessage(), 0, $exception); @@ -284,10 +271,6 @@ public function ack(string $id): bool public function reject(string $id): bool { try { - if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) { - return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0; - } - return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0; } catch (DBALException $exception) { throw new TransportException($exception->getMessage(), 0, $exception); @@ -298,7 +281,17 @@ public function setup(): void { $configuration = $this->driverConnection->getConfiguration(); $assetFilter = $configuration->getSchemaAssetsFilter(); - $configuration->setSchemaAssetsFilter(fn (string $tableName) => $tableName === $this->configuration['table_name']); + $configuration->setSchemaAssetsFilter(function ($tableName) { + if ($tableName instanceof AbstractAsset) { + $tableName = $tableName->getName(); + } + + if (!\is_string($tableName)) { + throw new \TypeError(\sprintf('The table name must be an instance of "%s" or a string ("%s" given).', AbstractAsset::class, get_debug_type($tableName))); + } + + return $tableName === $this->configuration['table_name']; + }); $this->updateSchema(); $configuration->setSchemaAssetsFilter($assetFilter); $this->autoSetup = false; @@ -367,7 +360,7 @@ public function getExtraSetupSqlForTable(Table $createdTable): array private function createAvailableMessagesQueryBuilder(): QueryBuilder { $now = new \DateTimeImmutable('UTC'); - $redeliverLimit = $now->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout'])); + $redeliverLimit = $now->modify(\sprintf('-%d seconds', $this->configuration['redeliver_timeout'])); return $this->createQueryBuilder() ->where('m.queue_name = ?') @@ -391,7 +384,9 @@ private function createQueryBuilder(string $alias = 'm'): QueryBuilder $alias .= '.'; - if (!$this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) { + if (!$this->driverConnection->getDatabasePlatform() instanceof FirebirdPlatform + && !$this->driverConnection->getDatabasePlatform() instanceof OraclePlatform + ) { return $queryBuilder->select($alias.'*'); } @@ -452,12 +447,18 @@ private function executeInsert(string $sql, array $parameters = [], array $types try { if ($this->driverConnection->getDatabasePlatform() instanceof PostgreSQLPlatform) { - $first = $this->driverConnection->fetchFirstColumn($sql, $parameters, $types); + if (!$id = $this->driverConnection->fetchFirstColumn($sql, $parameters, $types)[0] ?? null) { + throw new TransportException('no id was returned by PostgreSQL from RETURNING clause.'); + } - $id = $first[0] ?? null; + $this->driverConnection->executeStatement('SELECT pg_notify(?, ?)', [$this->configuration['table_name'], $this->configuration['queue_name']]); + } elseif ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) { + $sequenceName = $this->configuration['table_name'].self::ORACLE_SEQUENCES_SUFFIX; - if (!$id) { - throw new TransportException('no id was returned by PostgreSQL from RETURNING clause.'); + $this->driverConnection->executeStatement($sql, $parameters, $types); + + if (!$id = (int) $this->driverConnection->fetchOne('SELECT '.$sequenceName.'.CURRVAL FROM DUAL')) { + throw new TransportException('no id was returned by Oracle from sequence: '.$sequenceName); } } else { $this->driverConnection->executeStatement($sql, $parameters, $types); @@ -496,7 +497,7 @@ private function addTableToSchema(Schema $schema): void $table = $schema->createTable($this->configuration['table_name']); // add an internal option to mark that we created this & the non-namespaced table name $table->addOption(self::TABLE_OPTION_NAME, $this->configuration['table_name']); - $table->addColumn('id', Types::BIGINT) + $idColumn = $table->addColumn('id', Types::BIGINT) ->setAutoincrement(true) ->setNotnull(true); $table->addColumn('body', Types::TEXT) @@ -513,9 +514,14 @@ private function addTableToSchema(Schema $schema): void $table->addColumn('delivered_at', Types::DATETIME_IMMUTABLE) ->setNotnull(false); $table->setPrimaryKey(['id']); - $table->addIndex(['queue_name']); - $table->addIndex(['available_at']); - $table->addIndex(['delivered_at']); + $table->addIndex(['queue_name', 'available_at', 'delivered_at', 'id']); + + // We need to create a sequence for Oracle and set the id column to get the correct nextval + if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) { + $idColumn->setDefault($this->configuration['table_name'].self::ORACLE_SEQUENCES_SUFFIX.'.nextval'); + + $schema->createSequence($this->configuration['table_name'].self::ORACLE_SEQUENCES_SUFFIX); + } } private function decodeEnvelopeHeaders(array $doctrineEnvelope): array @@ -582,6 +588,10 @@ private function createSchemaManager(): AbstractSchemaManager private function createComparator(AbstractSchemaManager $schemaManager): Comparator { + if (class_exists(ComparatorConfig::class)) { + return $schemaManager->createComparator((new ComparatorConfig())->withReportModifiedIndexes(false)); + } + return method_exists($schemaManager, 'createComparator') ? $schemaManager->createComparator() : new Comparator(); diff --git a/Transport/DoctrineReceiver.php b/Transport/DoctrineReceiver.php index 20bd611..b07e5df 100644 --- a/Transport/DoctrineReceiver.php +++ b/Transport/DoctrineReceiver.php @@ -67,14 +67,14 @@ public function get(): iterable public function ack(Envelope $envelope): void { - $this->withRetryableExceptionRetry(function() use ($envelope) { + $this->withRetryableExceptionRetry(function () use ($envelope) { $this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId()); }); } public function reject(Envelope $envelope): void { - $this->withRetryableExceptionRetry(function() use ($envelope) { + $this->withRetryableExceptionRetry(function () use ($envelope) { $this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId()); }); } @@ -141,10 +141,12 @@ private function createEnvelopeFromData(array $data): Envelope throw $exception; } - return $envelope->with( - new DoctrineReceivedStamp($data['id']), - new TransportMessageIdStamp($data['id']) - ); + return $envelope + ->withoutAll(TransportMessageIdStamp::class) + ->with( + new DoctrineReceivedStamp($data['id']), + new TransportMessageIdStamp($data['id']) + ); } private function withRetryableExceptionRetry(callable $callable): void @@ -159,7 +161,7 @@ private function withRetryableExceptionRetry(callable $callable): void $callable(); } catch (RetryableException $exception) { if (++$retries <= self::MAX_RETRIES) { - $delay *= $multiplier; + $delay *= $multiplier; $randomness = (int) ($delay * $jitter); $delay += random_int(-$randomness, +$randomness); diff --git a/Transport/PostgreSqlConnection.php b/Transport/PostgreSqlConnection.php index b3d02c0..1fce9cd 100644 --- a/Transport/PostgreSqlConnection.php +++ b/Transport/PostgreSqlConnection.php @@ -11,8 +11,6 @@ namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport; -use Doctrine\DBAL\Schema\Table; - /** * Uses PostgreSQL LISTEN/NOTIFY to push messages to workers. * @@ -24,24 +22,23 @@ */ final class PostgreSqlConnection extends Connection { + private bool $listening = false; + /** * * check_delayed_interval: The interval to check for delayed messages, in milliseconds. Set to 0 to disable checks. Default: 60000 (1 minute) - * * get_notify_timeout: The length of time to wait for a response when calling PDO::pgsqlGetNotify, in milliseconds. Default: 0. + * * get_notify_timeout: The length of time to wait for a response when calling PDO::pgsqlGetNotify (or Pdo\Pgsql::getNotify on PHP 8.4+), in milliseconds. Default: 0. */ protected const DEFAULT_OPTIONS = parent::DEFAULT_OPTIONS + [ 'check_delayed_interval' => 60000, 'get_notify_timeout' => 0, ]; - public function __sleep(): array + public function __serialize(): array { throw new \BadMethodCallException('Cannot serialize '.__CLASS__); } - /** - * @return void - */ - public function __wakeup() + public function __unserialize(array $data): void { throw new \BadMethodCallException('Cannot unserialize '.__CLASS__); } @@ -51,6 +48,11 @@ public function __destruct() $this->unlisten(); } + public function isListening(): bool + { + return $this->listening; + } + public function reset(): void { parent::reset(); @@ -65,7 +67,9 @@ public function get(): ?array // This is secure because the table name must be a valid identifier: // https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS - $this->executeStatement(sprintf('LISTEN "%s"', $this->configuration['table_name'])); + $this->executeStatement(\sprintf('LISTEN "%s"', $this->configuration['table_name'])); + + $this->listening = true; // The condition should be removed once support for DBAL <3.3 is dropped if (method_exists($this->driverConnection, 'getNativeConnection')) { @@ -77,7 +81,9 @@ public function get(): ?array } } - $notification = $wrappedConnection->pgsqlGetNotify(\PDO::FETCH_ASSOC, $this->configuration['get_notify_timeout']); + $notification = \PHP_VERSION_ID >= 80500 + ? $wrappedConnection->getNotify(\PDO::FETCH_ASSOC, $this->configuration['get_notify_timeout']) + : $wrappedConnection->pgsqlGetNotify(\PDO::FETCH_ASSOC, $this->configuration['get_notify_timeout']); if ( // no notifications, or for another table or queue (false === $notification || $notification['message'] !== $this->configuration['table_name'] || $notification['payload'] !== $this->configuration['queue_name']) @@ -92,63 +98,13 @@ public function get(): ?array return parent::get(); } - public function setup(): void - { - parent::setup(); - - $this->executeStatement(implode("\n", $this->getTriggerSql())); - } - - /** - * @return string[] - */ - public function getExtraSetupSqlForTable(Table $createdTable): array - { - if (!$createdTable->hasOption(self::TABLE_OPTION_NAME)) { - return []; - } - - if ($createdTable->getOption(self::TABLE_OPTION_NAME) !== $this->configuration['table_name']) { - return []; - } - - return $this->getTriggerSql(); - } - - private function getTriggerSql(): array - { - $functionName = $this->createTriggerFunctionName(); - - return [ - // create trigger function - sprintf(<<<'SQL' -CREATE OR REPLACE FUNCTION %1$s() RETURNS TRIGGER AS $$ - BEGIN - PERFORM pg_notify('%2$s', NEW.queue_name::text); - RETURN NEW; - END; -$$ LANGUAGE plpgsql; -SQL - , $functionName, $this->configuration['table_name']), - // register trigger - sprintf('DROP TRIGGER IF EXISTS notify_trigger ON %s;', $this->configuration['table_name']), - sprintf('CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON %1$s FOR EACH ROW EXECUTE PROCEDURE %2$s();', $this->configuration['table_name'], $functionName), - ]; - } - - private function createTriggerFunctionName(): string + private function unlisten(): void { - $tableConfig = explode('.', $this->configuration['table_name']); - - if (1 === \count($tableConfig)) { - return sprintf('notify_%1$s', $tableConfig[0]); + if (!$this->listening) { + return; } - return sprintf('%1$s.notify_%2$s', $tableConfig[0], $tableConfig[1]); - } - - private function unlisten(): void - { - $this->executeStatement(sprintf('UNLISTEN "%s"', $this->configuration['table_name'])); + $this->executeStatement(\sprintf('UNLISTEN "%s"', $this->configuration['table_name'])); + $this->listening = false; } }