diff --git a/.github/workflows/check-subtree-split.yml b/.github/workflows/check-subtree-split.yml deleted file mode 100644 index 16be48b..0000000 --- a/.github/workflows/check-subtree-split.yml +++ /dev/null @@ -1,37 +0,0 @@ -name: Check subtree split - -on: - pull_request_target: - -jobs: - close-pull-request: - runs-on: ubuntu-latest - - steps: - - name: Close pull request - uses: actions/github-script@v6 - with: - script: | - if (context.repo.owner === "symfony") { - github.rest.issues.createComment({ - owner: "symfony", - repo: context.repo.repo, - issue_number: context.issue.number, - body: ` - Thanks for your Pull Request! We love contributions. - - However, you should instead open your PR on the main repository: - https://github.com/symfony/symfony - - This repository is what we call a "subtree split": a read-only subset of that main repository. - We're looking forward to your PR there! - ` - }); - - github.rest.pulls.update({ - owner: "symfony", - repo: context.repo.repo, - pull_number: context.issue.number, - state: "closed" - }); - } diff --git a/.github/workflows/close-pull-request.yml b/.github/workflows/close-pull-request.yml new file mode 100644 index 0000000..e55b478 --- /dev/null +++ b/.github/workflows/close-pull-request.yml @@ -0,0 +1,20 @@ +name: Close Pull Request + +on: + pull_request_target: + types: [opened] + +jobs: + run: + runs-on: ubuntu-latest + steps: + - uses: superbrothers/close-pull-request@v3 + with: + comment: | + Thanks for your Pull Request! We love contributions. + + However, you should instead open your PR on the main repository: + https://github.com/symfony/symfony + + This repository is what we call a "subtree split": a read-only subset of that main repository. + We're looking forward to your PR there! diff --git a/Tests/Transport/ConnectionTest.php b/Tests/Transport/ConnectionTest.php index 3544a03..240b6a8 100644 --- a/Tests/Transport/ConnectionTest.php +++ b/Tests/Transport/ConnectionTest.php @@ -665,7 +665,7 @@ public function testFindAllSqlGenerated(AbstractPlatform $platform, string $expe $connection->findAll(50); } - public function provideFindAllSqlGeneratedByPlatform(): iterable + public static function provideFindAllSqlGeneratedByPlatform(): iterable { yield 'MySQL' => [ class_exists(MySQLPlatform::class) ? new MySQLPlatform() : new MySQL57Platform(), @@ -698,4 +698,21 @@ class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::c ]; } } + + public function testConfigureSchemaOracleSequenceNameSuffixed() + { + $driverConnection = $this->createMock(DBALConnection::class); + $driverConnection->method('getDatabasePlatform')->willReturn(new OraclePlatform()); + $schema = new Schema(); + + $connection = new Connection(['table_name' => 'messenger_messages'], $driverConnection); + $connection->configureSchema($schema, $driverConnection, fn () => true); + + $expectedSuffix = '_seq'; + $sequences = $schema->getSequences(); + $this->assertCount(1, $sequences); + $sequence = array_pop($sequences); + $sequenceNameSuffix = substr($sequence->getName(), -strlen($expectedSuffix)); + $this->assertSame($expectedSuffix, $sequenceNameSuffix); + } } diff --git a/Tests/Transport/DoctrinePostgreSqlFilterIntegrationTest.php b/Tests/Transport/DoctrinePostgreSqlFilterIntegrationTest.php new file mode 100644 index 0000000..9a4738b --- /dev/null +++ b/Tests/Transport/DoctrinePostgreSqlFilterIntegrationTest.php @@ -0,0 +1,119 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport; + +use Doctrine\DBAL\Configuration; +use Doctrine\DBAL\Connection; +use Doctrine\DBAL\DriverManager; +use Doctrine\DBAL\Schema\Column; +use Doctrine\DBAL\Schema\DefaultSchemaManagerFactory; +use Doctrine\DBAL\Schema\Sequence; +use Doctrine\DBAL\Schema\Table; +use Doctrine\DBAL\Tools\DsnParser; +use Doctrine\DBAL\Types\Type; +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection; + +/** + * This test checks on a postgres connection whether the doctrine asset filter works as expected. + * + * @requires extension pdo_pgsql + * + * @group integration + */ +class DoctrinePostgreSqlFilterIntegrationTest extends TestCase +{ + private Connection $driverConnection; + + protected function setUp(): void + { + if (!$host = getenv('POSTGRES_HOST')) { + $this->markTestSkipped('Missing POSTGRES_HOST env variable'); + } + + $url = "pdo-pgsql://postgres:password@$host"; + $params = (new DsnParser())->parse($url); + $config = new Configuration(); + if (class_exists(DefaultSchemaManagerFactory::class)) { + $config->setSchemaManagerFactory(new DefaultSchemaManagerFactory()); + } + + $this->driverConnection = DriverManager::getConnection($params, $config); + + $this->createAssets(); + } + + protected function tearDown(): void + { + $this->removeAssets(); + + $this->driverConnection->close(); + } + + public function testFilterAssets() + { + $schemaManager = $this->driverConnection->createSchemaManager(); + + $this->assertFalse($schemaManager->tablesExist(['queue_table'])); + $this->assertTrue($schemaManager->tablesExist(['app_table'])); + $this->assertTrue($this->hasSequence('app_table_id')); + + $connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $this->driverConnection); + $connection->setup(); + + $schemaManager = $this->driverConnection->createSchemaManager(); + + $this->assertTrue($schemaManager->tablesExist(['queue_table'])); + $this->assertTrue($schemaManager->tablesExist(['app_table'])); + $this->assertTrue($this->hasSequence('app_table_id')); + } + + private function createAssets(): void + { + $this->removeAssets(); + + $schemaManager = $this->driverConnection->createSchemaManager(); + $schemaManager->createTable(new Table('app_table', [new Column('id', Type::getType('integer'))])); + $schemaManager->createSequence(new Sequence('app_table_id')); + } + + private function removeAssets(): void + { + $schemaManager = $this->driverConnection->createSchemaManager(); + + if ($schemaManager->tablesExist(['queue_table'])) { + $schemaManager->dropTable('queue_table'); + } + + if ($schemaManager->tablesExist(['app_table'])) { + $schemaManager->dropTable('app_table'); + } + + if ($this->hasSequence('app_table_id')) { + $schemaManager->dropSequence('app_table_id'); + } + } + + private function hasSequence(string $name): bool + { + $schemaManager = $this->driverConnection->createSchemaManager(); + + $sequences = $schemaManager->listSequences(); + foreach ($sequences as $sequence) { + if ($sequence->getName() === $name) { + return true; + } + } + + return false; + } +} diff --git a/Tests/Transport/DoctrineReceiverTest.php b/Tests/Transport/DoctrineReceiverTest.php index 36ee145..fcf4d67 100644 --- a/Tests/Transport/DoctrineReceiverTest.php +++ b/Tests/Transport/DoctrineReceiverTest.php @@ -28,6 +28,8 @@ use Symfony\Component\Messenger\Transport\Serialization\Serializer; use Symfony\Component\Serializer as SerializerComponent; use Symfony\Component\Serializer\Encoder\JsonEncoder; +use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer; +use Symfony\Component\Serializer\Normalizer\DateTimeNormalizer; use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; class DoctrineReceiverTest extends TestCase @@ -45,7 +47,7 @@ public function testItReturnsTheDecodedMessageToTheHandler() $this->assertCount(1, $actualEnvelopes); /** @var Envelope $actualEnvelope */ $actualEnvelope = $actualEnvelopes[0]; - $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage()); /** @var DoctrineReceivedStamp $doctrineReceivedStamp */ $doctrineReceivedStamp = $actualEnvelope->last(DoctrineReceivedStamp::class); @@ -100,6 +102,23 @@ public function testOccursRetryableExceptionFromConnection() $receiver->get(); } + public function testGetReplacesExistingTransportMessageIdStamps() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope = $this->createRetriedDoctrineEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->method('get')->willReturn($doctrineEnvelope); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelopes = $receiver->get(); + /** @var Envelope $actualEnvelope */ + $actualEnvelope = $actualEnvelopes[0]; + $messageIdStamps = $actualEnvelope->all(TransportMessageIdStamp::class); + + $this->assertCount(1, $messageIdStamps); + } + public function testAll() { $serializer = $this->createSerializer(); @@ -115,6 +134,24 @@ public function testAll() $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); } + public function testAllReplacesExistingTransportMessageIdStamps() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope1 = $this->createRetriedDoctrineEnvelope(); + $doctrineEnvelope2 = $this->createRetriedDoctrineEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->method('findAll')->willReturn([$doctrineEnvelope1, $doctrineEnvelope2]); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelopes = $receiver->all(); + foreach ($actualEnvelopes as $actualEnvelope) { + $messageIdStamps = $actualEnvelope->all(TransportMessageIdStamp::class); + + $this->assertCount(1, $messageIdStamps); + } + } + public function testFind() { $serializer = $this->createSerializer(); @@ -128,6 +165,21 @@ public function testFind() $this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage()); } + public function testFindReplacesExistingTransportMessageIdStamps() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope = $this->createRetriedDoctrineEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->method('find')->with(3)->willReturn($doctrineEnvelope); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelope = $receiver->find(3); + $messageIdStamps = $actualEnvelope->all(TransportMessageIdStamp::class); + + $this->assertCount(1, $messageIdStamps); + } + public function testAck() { $serializer = $this->createSerializer(); @@ -195,7 +247,7 @@ public function testAckThrowsRetryableExceptionAndRetriesFail() ->with('1') ->willThrowException($deadlockException); - self::expectException(TransportException::class); + $this->expectException(TransportException::class); $receiver->ack($envelope); } @@ -215,7 +267,7 @@ public function testAckThrowsException() ->with('1') ->willThrowException($exception); - self::expectException($exception::class); + $this->expectException($exception::class); $receiver->ack($envelope); } @@ -286,7 +338,7 @@ public function testRejectThrowsRetryableExceptionAndRetriesFail() ->with('1') ->willThrowException($deadlockException); - self::expectException(TransportException::class); + $this->expectException(TransportException::class); $receiver->reject($envelope); } @@ -306,7 +358,7 @@ public function testRejectThrowsException() ->with('1') ->willThrowException($exception); - self::expectException($exception::class); + $this->expectException($exception::class); $receiver->reject($envelope); } @@ -321,12 +373,27 @@ private function createDoctrineEnvelope(): array ]; } + private function createRetriedDoctrineEnvelope(): array + { + return [ + 'id' => 3, + 'body' => '{"message": "Hi"}', + 'headers' => [ + 'type' => DummyMessage::class, + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\BusNameStamp' => '[{"busName":"messenger.bus.default"}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\TransportMessageIdStamp' => '[{"id":1},{"id":2}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\ErrorDetailsStamp' => '[{"exceptionClass":"Symfony\\\\Component\\\\Messenger\\\\Exception\\\\RecoverableMessageHandlingException","exceptionCode":0,"exceptionMessage":"","flattenException":null}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\DelayStamp' => '[{"delay":1000},{"delay":1000}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\RedeliveryStamp' => '[{"retryCount":1,"redeliveredAt":"2025-01-05T13:58:25+00:00"},{"retryCount":2,"redeliveredAt":"2025-01-05T13:59:26+00:00"}]', + 'Content-Type' => 'application/json', + ], + ]; + } + private function createSerializer(): Serializer { - $serializer = new Serializer( - new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + return new Serializer( + new SerializerComponent\Serializer([new DateTimeNormalizer(), new ArrayDenormalizer(), new ObjectNormalizer()], ['json' => new JsonEncoder()]) ); - - return $serializer; } } diff --git a/Tests/Transport/DoctrineSenderTest.php b/Tests/Transport/DoctrineSenderTest.php index 8505e3d..1f76953 100644 --- a/Tests/Transport/DoctrineSenderTest.php +++ b/Tests/Transport/DoctrineSenderTest.php @@ -31,7 +31,7 @@ public function testSend() $connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'])->willReturn('15'); $serializer = $this->createMock(SerializerInterface::class); - $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + $serializer->method('encode')->with($envelope)->willReturn($encoded); $sender = new DoctrineSender($connection, $serializer); $actualEnvelope = $sender->send($envelope); @@ -51,7 +51,7 @@ public function testSendWithDelay() $connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 500); $serializer = $this->createMock(SerializerInterface::class); - $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + $serializer->method('encode')->with($envelope)->willReturn($encoded); $sender = new DoctrineSender($connection, $serializer); $sender->send($envelope); diff --git a/Tests/Transport/PostgreSqlConnectionTest.php b/Tests/Transport/PostgreSqlConnectionTest.php index 71dfcc3..7e2e436 100644 --- a/Tests/Transport/PostgreSqlConnectionTest.php +++ b/Tests/Transport/PostgreSqlConnectionTest.php @@ -11,8 +11,8 @@ namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport; -use Doctrine\DBAL\Cache\ArrayResult; use Doctrine\DBAL\Cache\ArrayStatement; +use Doctrine\DBAL\Driver\Result as DriverResult; use Doctrine\DBAL\Platforms\PostgreSQLPlatform; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Result; @@ -98,10 +98,13 @@ public function countNotifyCalls() ->method('getNativeConnection') ->willReturn($wrappedConnection); + $driverResult = $this->createMock(DriverResult::class); + $driverResult->method('fetchAssociative') + ->willReturn(false); $driverConnection ->expects(self::any()) ->method('executeQuery') - ->willReturn(new Result(new ArrayResult([]), $driverConnection)); + ->willReturn(new Result($driverResult, $driverConnection)); } $connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $driverConnection); diff --git a/Transport/Connection.php b/Transport/Connection.php index 34b4715..034291d 100644 --- a/Transport/Connection.php +++ b/Transport/Connection.php @@ -24,6 +24,7 @@ use Doctrine\DBAL\Query\ForUpdate\ConflictResolutionMode; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Result; +use Doctrine\DBAL\Schema\AbstractAsset; use Doctrine\DBAL\Schema\AbstractSchemaManager; use Doctrine\DBAL\Schema\Comparator; use Doctrine\DBAL\Schema\Schema; @@ -52,6 +53,8 @@ class Connection implements ResetInterface 'auto_setup' => true, ]; + private const ORACLE_SEQUENCES_SUFFIX = '_seq'; + /** * Configuration of the connection. * @@ -298,7 +301,17 @@ public function setup(): void { $configuration = $this->driverConnection->getConfiguration(); $assetFilter = $configuration->getSchemaAssetsFilter(); - $configuration->setSchemaAssetsFilter(fn (string $tableName) => $tableName === $this->configuration['table_name']); + $configuration->setSchemaAssetsFilter(function ($tableName) { + if ($tableName instanceof AbstractAsset) { + $tableName = $tableName->getName(); + } + + if (!\is_string($tableName)) { + throw new \TypeError(sprintf('The table name must be an instance of "%s" or a string ("%s" given).', AbstractAsset::class, get_debug_type($tableName))); + } + + return $tableName === $this->configuration['table_name']; + }); $this->updateSchema(); $configuration->setSchemaAssetsFilter($assetFilter); $this->autoSetup = false; @@ -459,6 +472,18 @@ private function executeInsert(string $sql, array $parameters = [], array $types if (!$id) { throw new TransportException('no id was returned by PostgreSQL from RETURNING clause.'); } + } elseif ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) { + $sequenceName = $this->configuration['table_name'].self::ORACLE_SEQUENCES_SUFFIX; + + $this->driverConnection->executeStatement($sql, $parameters, $types); + + $result = $this->driverConnection->fetchOne('SELECT '.$sequenceName.'.CURRVAL FROM DUAL'); + + $id = (int) $result; + + if (!$id) { + throw new TransportException('no id was returned by Oracle from sequence: '.$sequenceName); + } } else { $this->driverConnection->executeStatement($sql, $parameters, $types); @@ -496,7 +521,7 @@ private function addTableToSchema(Schema $schema): void $table = $schema->createTable($this->configuration['table_name']); // add an internal option to mark that we created this & the non-namespaced table name $table->addOption(self::TABLE_OPTION_NAME, $this->configuration['table_name']); - $table->addColumn('id', Types::BIGINT) + $idColumn = $table->addColumn('id', Types::BIGINT) ->setAutoincrement(true) ->setNotnull(true); $table->addColumn('body', Types::TEXT) @@ -516,6 +541,13 @@ private function addTableToSchema(Schema $schema): void $table->addIndex(['queue_name']); $table->addIndex(['available_at']); $table->addIndex(['delivered_at']); + + // We need to create a sequence for Oracle and set the id column to get the correct nextval + if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) { + $idColumn->setDefault($this->configuration['table_name'].self::ORACLE_SEQUENCES_SUFFIX.'.nextval'); + + $schema->createSequence($this->configuration['table_name'].self::ORACLE_SEQUENCES_SUFFIX); + } } private function decodeEnvelopeHeaders(array $doctrineEnvelope): array diff --git a/Transport/DoctrineReceiver.php b/Transport/DoctrineReceiver.php index 20bd611..85bd607 100644 --- a/Transport/DoctrineReceiver.php +++ b/Transport/DoctrineReceiver.php @@ -141,10 +141,12 @@ private function createEnvelopeFromData(array $data): Envelope throw $exception; } - return $envelope->with( - new DoctrineReceivedStamp($data['id']), - new TransportMessageIdStamp($data['id']) - ); + return $envelope + ->withoutAll(TransportMessageIdStamp::class) + ->with( + new DoctrineReceivedStamp($data['id']), + new TransportMessageIdStamp($data['id']) + ); } private function withRetryableExceptionRetry(callable $callable): void