diff --git a/.github/workflows/check-subtree-split.yml b/.github/workflows/check-subtree-split.yml deleted file mode 100644 index 16be48b..0000000 --- a/.github/workflows/check-subtree-split.yml +++ /dev/null @@ -1,37 +0,0 @@ -name: Check subtree split - -on: - pull_request_target: - -jobs: - close-pull-request: - runs-on: ubuntu-latest - - steps: - - name: Close pull request - uses: actions/github-script@v6 - with: - script: | - if (context.repo.owner === "symfony") { - github.rest.issues.createComment({ - owner: "symfony", - repo: context.repo.repo, - issue_number: context.issue.number, - body: ` - Thanks for your Pull Request! We love contributions. - - However, you should instead open your PR on the main repository: - https://github.com/symfony/symfony - - This repository is what we call a "subtree split": a read-only subset of that main repository. - We're looking forward to your PR there! - ` - }); - - github.rest.pulls.update({ - owner: "symfony", - repo: context.repo.repo, - pull_number: context.issue.number, - state: "closed" - }); - } diff --git a/.github/workflows/close-pull-request.yml b/.github/workflows/close-pull-request.yml new file mode 100644 index 0000000..e55b478 --- /dev/null +++ b/.github/workflows/close-pull-request.yml @@ -0,0 +1,20 @@ +name: Close Pull Request + +on: + pull_request_target: + types: [opened] + +jobs: + run: + runs-on: ubuntu-latest + steps: + - uses: superbrothers/close-pull-request@v3 + with: + comment: | + Thanks for your Pull Request! We love contributions. + + However, you should instead open your PR on the main repository: + https://github.com/symfony/symfony + + This repository is what we call a "subtree split": a read-only subset of that main repository. + We're looking forward to your PR there! diff --git a/Tests/Transport/ConnectionTest.php b/Tests/Transport/ConnectionTest.php index 3544a03..d5275c9 100644 --- a/Tests/Transport/ConnectionTest.php +++ b/Tests/Transport/ConnectionTest.php @@ -584,7 +584,7 @@ class_exists(MySQLPlatform::class) ? new MySQLPlatform() : new MySQL57Platform() yield 'SQL Server' => [ class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::class) ? new SQLServerPlatform() : new SQLServer2012Platform(), - sprintf('SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK%s) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ', method_exists(QueryBuilder::class, 'forUpdate') ? ', READPAST' : ''), + \sprintf('SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK%s) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ', method_exists(QueryBuilder::class, 'forUpdate') ? ', READPAST' : ''), ]; if (!class_exists(MySQL57Platform::class)) { @@ -597,7 +597,7 @@ class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::c // DBAL < 4 yield 'Oracle' => [ new OraclePlatform(), - sprintf('SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE%s', method_exists(QueryBuilder::class, 'forUpdate') ? ' SKIP LOCKED' : ''), + \sprintf('SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE%s', method_exists(QueryBuilder::class, 'forUpdate') ? ' SKIP LOCKED' : ''), ]; } } @@ -665,7 +665,7 @@ public function testFindAllSqlGenerated(AbstractPlatform $platform, string $expe $connection->findAll(50); } - public function provideFindAllSqlGeneratedByPlatform(): iterable + public static function provideFindAllSqlGeneratedByPlatform(): iterable { yield 'MySQL' => [ class_exists(MySQLPlatform::class) ? new MySQLPlatform() : new MySQL57Platform(), @@ -698,4 +698,21 @@ class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::c ]; } } + + public function testConfigureSchemaOracleSequenceNameSuffixed() + { + $driverConnection = $this->createMock(DBALConnection::class); + $driverConnection->method('getDatabasePlatform')->willReturn(new OraclePlatform()); + $schema = new Schema(); + + $connection = new Connection(['table_name' => 'messenger_messages'], $driverConnection); + $connection->configureSchema($schema, $driverConnection, fn () => true); + + $expectedSuffix = '_seq'; + $sequences = $schema->getSequences(); + $this->assertCount(1, $sequences); + $sequence = array_pop($sequences); + $sequenceNameSuffix = substr($sequence->getName(), -\strlen($expectedSuffix)); + $this->assertSame($expectedSuffix, $sequenceNameSuffix); + } } diff --git a/Tests/Transport/DoctrinePostgreSqlFilterIntegrationTest.php b/Tests/Transport/DoctrinePostgreSqlFilterIntegrationTest.php new file mode 100644 index 0000000..9a4738b --- /dev/null +++ b/Tests/Transport/DoctrinePostgreSqlFilterIntegrationTest.php @@ -0,0 +1,119 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport; + +use Doctrine\DBAL\Configuration; +use Doctrine\DBAL\Connection; +use Doctrine\DBAL\DriverManager; +use Doctrine\DBAL\Schema\Column; +use Doctrine\DBAL\Schema\DefaultSchemaManagerFactory; +use Doctrine\DBAL\Schema\Sequence; +use Doctrine\DBAL\Schema\Table; +use Doctrine\DBAL\Tools\DsnParser; +use Doctrine\DBAL\Types\Type; +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection; + +/** + * This test checks on a postgres connection whether the doctrine asset filter works as expected. + * + * @requires extension pdo_pgsql + * + * @group integration + */ +class DoctrinePostgreSqlFilterIntegrationTest extends TestCase +{ + private Connection $driverConnection; + + protected function setUp(): void + { + if (!$host = getenv('POSTGRES_HOST')) { + $this->markTestSkipped('Missing POSTGRES_HOST env variable'); + } + + $url = "pdo-pgsql://postgres:password@$host"; + $params = (new DsnParser())->parse($url); + $config = new Configuration(); + if (class_exists(DefaultSchemaManagerFactory::class)) { + $config->setSchemaManagerFactory(new DefaultSchemaManagerFactory()); + } + + $this->driverConnection = DriverManager::getConnection($params, $config); + + $this->createAssets(); + } + + protected function tearDown(): void + { + $this->removeAssets(); + + $this->driverConnection->close(); + } + + public function testFilterAssets() + { + $schemaManager = $this->driverConnection->createSchemaManager(); + + $this->assertFalse($schemaManager->tablesExist(['queue_table'])); + $this->assertTrue($schemaManager->tablesExist(['app_table'])); + $this->assertTrue($this->hasSequence('app_table_id')); + + $connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $this->driverConnection); + $connection->setup(); + + $schemaManager = $this->driverConnection->createSchemaManager(); + + $this->assertTrue($schemaManager->tablesExist(['queue_table'])); + $this->assertTrue($schemaManager->tablesExist(['app_table'])); + $this->assertTrue($this->hasSequence('app_table_id')); + } + + private function createAssets(): void + { + $this->removeAssets(); + + $schemaManager = $this->driverConnection->createSchemaManager(); + $schemaManager->createTable(new Table('app_table', [new Column('id', Type::getType('integer'))])); + $schemaManager->createSequence(new Sequence('app_table_id')); + } + + private function removeAssets(): void + { + $schemaManager = $this->driverConnection->createSchemaManager(); + + if ($schemaManager->tablesExist(['queue_table'])) { + $schemaManager->dropTable('queue_table'); + } + + if ($schemaManager->tablesExist(['app_table'])) { + $schemaManager->dropTable('app_table'); + } + + if ($this->hasSequence('app_table_id')) { + $schemaManager->dropSequence('app_table_id'); + } + } + + private function hasSequence(string $name): bool + { + $schemaManager = $this->driverConnection->createSchemaManager(); + + $sequences = $schemaManager->listSequences(); + foreach ($sequences as $sequence) { + if ($sequence->getName() === $name) { + return true; + } + } + + return false; + } +} diff --git a/Tests/Transport/DoctrineReceiverTest.php b/Tests/Transport/DoctrineReceiverTest.php index 36ee145..fcf4d67 100644 --- a/Tests/Transport/DoctrineReceiverTest.php +++ b/Tests/Transport/DoctrineReceiverTest.php @@ -28,6 +28,8 @@ use Symfony\Component\Messenger\Transport\Serialization\Serializer; use Symfony\Component\Serializer as SerializerComponent; use Symfony\Component\Serializer\Encoder\JsonEncoder; +use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer; +use Symfony\Component\Serializer\Normalizer\DateTimeNormalizer; use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; class DoctrineReceiverTest extends TestCase @@ -45,7 +47,7 @@ public function testItReturnsTheDecodedMessageToTheHandler() $this->assertCount(1, $actualEnvelopes); /** @var Envelope $actualEnvelope */ $actualEnvelope = $actualEnvelopes[0]; - $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage()); /** @var DoctrineReceivedStamp $doctrineReceivedStamp */ $doctrineReceivedStamp = $actualEnvelope->last(DoctrineReceivedStamp::class); @@ -100,6 +102,23 @@ public function testOccursRetryableExceptionFromConnection() $receiver->get(); } + public function testGetReplacesExistingTransportMessageIdStamps() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope = $this->createRetriedDoctrineEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->method('get')->willReturn($doctrineEnvelope); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelopes = $receiver->get(); + /** @var Envelope $actualEnvelope */ + $actualEnvelope = $actualEnvelopes[0]; + $messageIdStamps = $actualEnvelope->all(TransportMessageIdStamp::class); + + $this->assertCount(1, $messageIdStamps); + } + public function testAll() { $serializer = $this->createSerializer(); @@ -115,6 +134,24 @@ public function testAll() $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); } + public function testAllReplacesExistingTransportMessageIdStamps() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope1 = $this->createRetriedDoctrineEnvelope(); + $doctrineEnvelope2 = $this->createRetriedDoctrineEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->method('findAll')->willReturn([$doctrineEnvelope1, $doctrineEnvelope2]); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelopes = $receiver->all(); + foreach ($actualEnvelopes as $actualEnvelope) { + $messageIdStamps = $actualEnvelope->all(TransportMessageIdStamp::class); + + $this->assertCount(1, $messageIdStamps); + } + } + public function testFind() { $serializer = $this->createSerializer(); @@ -128,6 +165,21 @@ public function testFind() $this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage()); } + public function testFindReplacesExistingTransportMessageIdStamps() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope = $this->createRetriedDoctrineEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->method('find')->with(3)->willReturn($doctrineEnvelope); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelope = $receiver->find(3); + $messageIdStamps = $actualEnvelope->all(TransportMessageIdStamp::class); + + $this->assertCount(1, $messageIdStamps); + } + public function testAck() { $serializer = $this->createSerializer(); @@ -195,7 +247,7 @@ public function testAckThrowsRetryableExceptionAndRetriesFail() ->with('1') ->willThrowException($deadlockException); - self::expectException(TransportException::class); + $this->expectException(TransportException::class); $receiver->ack($envelope); } @@ -215,7 +267,7 @@ public function testAckThrowsException() ->with('1') ->willThrowException($exception); - self::expectException($exception::class); + $this->expectException($exception::class); $receiver->ack($envelope); } @@ -286,7 +338,7 @@ public function testRejectThrowsRetryableExceptionAndRetriesFail() ->with('1') ->willThrowException($deadlockException); - self::expectException(TransportException::class); + $this->expectException(TransportException::class); $receiver->reject($envelope); } @@ -306,7 +358,7 @@ public function testRejectThrowsException() ->with('1') ->willThrowException($exception); - self::expectException($exception::class); + $this->expectException($exception::class); $receiver->reject($envelope); } @@ -321,12 +373,27 @@ private function createDoctrineEnvelope(): array ]; } + private function createRetriedDoctrineEnvelope(): array + { + return [ + 'id' => 3, + 'body' => '{"message": "Hi"}', + 'headers' => [ + 'type' => DummyMessage::class, + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\BusNameStamp' => '[{"busName":"messenger.bus.default"}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\TransportMessageIdStamp' => '[{"id":1},{"id":2}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\ErrorDetailsStamp' => '[{"exceptionClass":"Symfony\\\\Component\\\\Messenger\\\\Exception\\\\RecoverableMessageHandlingException","exceptionCode":0,"exceptionMessage":"","flattenException":null}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\DelayStamp' => '[{"delay":1000},{"delay":1000}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\RedeliveryStamp' => '[{"retryCount":1,"redeliveredAt":"2025-01-05T13:58:25+00:00"},{"retryCount":2,"redeliveredAt":"2025-01-05T13:59:26+00:00"}]', + 'Content-Type' => 'application/json', + ], + ]; + } + private function createSerializer(): Serializer { - $serializer = new Serializer( - new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + return new Serializer( + new SerializerComponent\Serializer([new DateTimeNormalizer(), new ArrayDenormalizer(), new ObjectNormalizer()], ['json' => new JsonEncoder()]) ); - - return $serializer; } } diff --git a/Tests/Transport/DoctrineSenderTest.php b/Tests/Transport/DoctrineSenderTest.php index 8505e3d..1f76953 100644 --- a/Tests/Transport/DoctrineSenderTest.php +++ b/Tests/Transport/DoctrineSenderTest.php @@ -31,7 +31,7 @@ public function testSend() $connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'])->willReturn('15'); $serializer = $this->createMock(SerializerInterface::class); - $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + $serializer->method('encode')->with($envelope)->willReturn($encoded); $sender = new DoctrineSender($connection, $serializer); $actualEnvelope = $sender->send($envelope); @@ -51,7 +51,7 @@ public function testSendWithDelay() $connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 500); $serializer = $this->createMock(SerializerInterface::class); - $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + $serializer->method('encode')->with($envelope)->willReturn($encoded); $sender = new DoctrineSender($connection, $serializer); $sender->send($envelope); diff --git a/Tests/Transport/PostgreSqlConnectionTest.php b/Tests/Transport/PostgreSqlConnectionTest.php index 71dfcc3..1b67516 100644 --- a/Tests/Transport/PostgreSqlConnectionTest.php +++ b/Tests/Transport/PostgreSqlConnectionTest.php @@ -11,8 +11,8 @@ namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport; -use Doctrine\DBAL\Cache\ArrayResult; use Doctrine\DBAL\Cache\ArrayStatement; +use Doctrine\DBAL\Driver\Result as DriverResult; use Doctrine\DBAL\Platforms\PostgreSQLPlatform; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Result; @@ -46,7 +46,7 @@ public function testUnserialize() $driverConnection->method('executeStatement')->willReturn(1); $connection = new PostgreSqlConnection([], $driverConnection); - $connection->__wakeup(); + $connection->__unserialize([]); } public function testListenOnConnection() @@ -64,7 +64,7 @@ public function testListenOnConnection() ->method('createQueryBuilder') ->willReturn(new QueryBuilder($driverConnection)); - $wrappedConnection = new class() { + $wrappedConnection = new class { private int $notifyCalls = 0; public function pgsqlGetNotify() @@ -98,10 +98,13 @@ public function countNotifyCalls() ->method('getNativeConnection') ->willReturn($wrappedConnection); + $driverResult = $this->createMock(DriverResult::class); + $driverResult->method('fetchAssociative') + ->willReturn(false); $driverConnection ->expects(self::any()) ->method('executeQuery') - ->willReturn(new Result(new ArrayResult([]), $driverConnection)); + ->willReturn(new Result($driverResult, $driverConnection)); } $connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $driverConnection); diff --git a/Transport/Connection.php b/Transport/Connection.php index 34b4715..e51d961 100644 --- a/Transport/Connection.php +++ b/Transport/Connection.php @@ -24,13 +24,16 @@ use Doctrine\DBAL\Query\ForUpdate\ConflictResolutionMode; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Result; +use Doctrine\DBAL\Schema\AbstractAsset; use Doctrine\DBAL\Schema\AbstractSchemaManager; use Doctrine\DBAL\Schema\Comparator; +use Doctrine\DBAL\Schema\ComparatorConfig; use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Schema\SchemaDiff; use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer; use Doctrine\DBAL\Schema\Table; use Doctrine\DBAL\Types\Types; +use Satag\DoctrineFirebirdDriver\Platforms\FirebirdPlatform; use Symfony\Component\Messenger\Exception\InvalidArgumentException; use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Contracts\Service\ResetInterface; @@ -52,6 +55,8 @@ class Connection implements ResetInterface 'auto_setup' => true, ]; + private const ORACLE_SEQUENCES_SUFFIX = '_seq'; + /** * Configuration of the connection. * @@ -107,13 +112,13 @@ public static function buildConfiguration(#[\SensitiveParameter] string $dsn, ar // check for extra keys in options $optionsExtraKeys = array_diff(array_keys($options), array_keys(static::DEFAULT_OPTIONS)); if (0 < \count($optionsExtraKeys)) { - throw new InvalidArgumentException(sprintf('Unknown option found: [%s]. Allowed options are [%s].', implode(', ', $optionsExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS)))); + throw new InvalidArgumentException(\sprintf('Unknown option found: [%s]. Allowed options are [%s].', implode(', ', $optionsExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS)))); } // check for extra keys in options $queryExtraKeys = array_diff(array_keys($query), array_keys(static::DEFAULT_OPTIONS)); if (0 < \count($queryExtraKeys)) { - throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s].', implode(', ', $queryExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS)))); + throw new InvalidArgumentException(\sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s].', implode(', ', $queryExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS)))); } return $configuration; @@ -129,7 +134,7 @@ public static function buildConfiguration(#[\SensitiveParameter] string $dsn, ar public function send(string $body, array $headers, int $delay = 0): string { $now = new \DateTimeImmutable('UTC'); - $availableAt = $now->modify(sprintf('%+d seconds', $delay / 1000)); + $availableAt = $now->modify(\sprintf('%+d seconds', $delay / 1000)); $queryBuilder = $this->driverConnection->createQueryBuilder() ->insert($this->configuration['table_name']) @@ -211,8 +216,8 @@ public function get(): ?array } elseif (preg_match('/FROM (.+) WHERE/', (string) $sql, $matches)) { $fromClause = $matches[1]; $sql = str_replace( - sprintf('FROM %s WHERE', $fromClause), - sprintf('FROM %s WHERE', $this->driverConnection->getDatabasePlatform()->appendLockHint($fromClause, LockMode::PESSIMISTIC_WRITE)), + \sprintf('FROM %s WHERE', $fromClause), + \sprintf('FROM %s WHERE', $this->driverConnection->getDatabasePlatform()->appendLockHint($fromClause, LockMode::PESSIMISTIC_WRITE)), $sql ); } @@ -298,7 +303,17 @@ public function setup(): void { $configuration = $this->driverConnection->getConfiguration(); $assetFilter = $configuration->getSchemaAssetsFilter(); - $configuration->setSchemaAssetsFilter(fn (string $tableName) => $tableName === $this->configuration['table_name']); + $configuration->setSchemaAssetsFilter(function ($tableName) { + if ($tableName instanceof AbstractAsset) { + $tableName = $tableName->getName(); + } + + if (!\is_string($tableName)) { + throw new \TypeError(\sprintf('The table name must be an instance of "%s" or a string ("%s" given).', AbstractAsset::class, get_debug_type($tableName))); + } + + return $tableName === $this->configuration['table_name']; + }); $this->updateSchema(); $configuration->setSchemaAssetsFilter($assetFilter); $this->autoSetup = false; @@ -367,7 +382,7 @@ public function getExtraSetupSqlForTable(Table $createdTable): array private function createAvailableMessagesQueryBuilder(): QueryBuilder { $now = new \DateTimeImmutable('UTC'); - $redeliverLimit = $now->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout'])); + $redeliverLimit = $now->modify(\sprintf('-%d seconds', $this->configuration['redeliver_timeout'])); return $this->createQueryBuilder() ->where('m.queue_name = ?') @@ -391,7 +406,9 @@ private function createQueryBuilder(string $alias = 'm'): QueryBuilder $alias .= '.'; - if (!$this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) { + if (!$this->driverConnection->getDatabasePlatform() instanceof FirebirdPlatform + && !$this->driverConnection->getDatabasePlatform() instanceof OraclePlatform + ) { return $queryBuilder->select($alias.'*'); } @@ -459,6 +476,18 @@ private function executeInsert(string $sql, array $parameters = [], array $types if (!$id) { throw new TransportException('no id was returned by PostgreSQL from RETURNING clause.'); } + } elseif ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) { + $sequenceName = $this->configuration['table_name'].self::ORACLE_SEQUENCES_SUFFIX; + + $this->driverConnection->executeStatement($sql, $parameters, $types); + + $result = $this->driverConnection->fetchOne('SELECT '.$sequenceName.'.CURRVAL FROM DUAL'); + + $id = (int) $result; + + if (!$id) { + throw new TransportException('no id was returned by Oracle from sequence: '.$sequenceName); + } } else { $this->driverConnection->executeStatement($sql, $parameters, $types); @@ -496,7 +525,7 @@ private function addTableToSchema(Schema $schema): void $table = $schema->createTable($this->configuration['table_name']); // add an internal option to mark that we created this & the non-namespaced table name $table->addOption(self::TABLE_OPTION_NAME, $this->configuration['table_name']); - $table->addColumn('id', Types::BIGINT) + $idColumn = $table->addColumn('id', Types::BIGINT) ->setAutoincrement(true) ->setNotnull(true); $table->addColumn('body', Types::TEXT) @@ -516,6 +545,13 @@ private function addTableToSchema(Schema $schema): void $table->addIndex(['queue_name']); $table->addIndex(['available_at']); $table->addIndex(['delivered_at']); + + // We need to create a sequence for Oracle and set the id column to get the correct nextval + if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) { + $idColumn->setDefault($this->configuration['table_name'].self::ORACLE_SEQUENCES_SUFFIX.'.nextval'); + + $schema->createSequence($this->configuration['table_name'].self::ORACLE_SEQUENCES_SUFFIX); + } } private function decodeEnvelopeHeaders(array $doctrineEnvelope): array @@ -582,6 +618,10 @@ private function createSchemaManager(): AbstractSchemaManager private function createComparator(AbstractSchemaManager $schemaManager): Comparator { + if (class_exists(ComparatorConfig::class)) { + return $schemaManager->createComparator((new ComparatorConfig())->withReportModifiedIndexes(false)); + } + return method_exists($schemaManager, 'createComparator') ? $schemaManager->createComparator() : new Comparator(); diff --git a/Transport/DoctrineReceiver.php b/Transport/DoctrineReceiver.php index 20bd611..b07e5df 100644 --- a/Transport/DoctrineReceiver.php +++ b/Transport/DoctrineReceiver.php @@ -67,14 +67,14 @@ public function get(): iterable public function ack(Envelope $envelope): void { - $this->withRetryableExceptionRetry(function() use ($envelope) { + $this->withRetryableExceptionRetry(function () use ($envelope) { $this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId()); }); } public function reject(Envelope $envelope): void { - $this->withRetryableExceptionRetry(function() use ($envelope) { + $this->withRetryableExceptionRetry(function () use ($envelope) { $this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId()); }); } @@ -141,10 +141,12 @@ private function createEnvelopeFromData(array $data): Envelope throw $exception; } - return $envelope->with( - new DoctrineReceivedStamp($data['id']), - new TransportMessageIdStamp($data['id']) - ); + return $envelope + ->withoutAll(TransportMessageIdStamp::class) + ->with( + new DoctrineReceivedStamp($data['id']), + new TransportMessageIdStamp($data['id']) + ); } private function withRetryableExceptionRetry(callable $callable): void @@ -159,7 +161,7 @@ private function withRetryableExceptionRetry(callable $callable): void $callable(); } catch (RetryableException $exception) { if (++$retries <= self::MAX_RETRIES) { - $delay *= $multiplier; + $delay *= $multiplier; $randomness = (int) ($delay * $jitter); $delay += random_int(-$randomness, +$randomness); diff --git a/Transport/PostgreSqlConnection.php b/Transport/PostgreSqlConnection.php index b3d02c0..5c372ed 100644 --- a/Transport/PostgreSqlConnection.php +++ b/Transport/PostgreSqlConnection.php @@ -33,15 +33,12 @@ final class PostgreSqlConnection extends Connection 'get_notify_timeout' => 0, ]; - public function __sleep(): array + public function __serialize(): array { throw new \BadMethodCallException('Cannot serialize '.__CLASS__); } - /** - * @return void - */ - public function __wakeup() + public function __unserialize(array $data): void { throw new \BadMethodCallException('Cannot unserialize '.__CLASS__); } @@ -65,7 +62,7 @@ public function get(): ?array // This is secure because the table name must be a valid identifier: // https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS - $this->executeStatement(sprintf('LISTEN "%s"', $this->configuration['table_name'])); + $this->executeStatement(\sprintf('LISTEN "%s"', $this->configuration['table_name'])); // The condition should be removed once support for DBAL <3.3 is dropped if (method_exists($this->driverConnection, 'getNativeConnection')) { @@ -121,7 +118,7 @@ private function getTriggerSql(): array return [ // create trigger function - sprintf(<<<'SQL' + \sprintf(<<<'SQL' CREATE OR REPLACE FUNCTION %1$s() RETURNS TRIGGER AS $$ BEGIN PERFORM pg_notify('%2$s', NEW.queue_name::text); @@ -131,8 +128,8 @@ private function getTriggerSql(): array SQL , $functionName, $this->configuration['table_name']), // register trigger - sprintf('DROP TRIGGER IF EXISTS notify_trigger ON %s;', $this->configuration['table_name']), - sprintf('CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON %1$s FOR EACH ROW EXECUTE PROCEDURE %2$s();', $this->configuration['table_name'], $functionName), + \sprintf('DROP TRIGGER IF EXISTS notify_trigger ON %s;', $this->configuration['table_name']), + \sprintf('CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON %1$s FOR EACH ROW EXECUTE PROCEDURE %2$s();', $this->configuration['table_name'], $functionName), ]; } @@ -141,14 +138,14 @@ private function createTriggerFunctionName(): string $tableConfig = explode('.', $this->configuration['table_name']); if (1 === \count($tableConfig)) { - return sprintf('notify_%1$s', $tableConfig[0]); + return \sprintf('notify_%1$s', $tableConfig[0]); } - return sprintf('%1$s.notify_%2$s', $tableConfig[0], $tableConfig[1]); + return \sprintf('%1$s.notify_%2$s', $tableConfig[0], $tableConfig[1]); } private function unlisten(): void { - $this->executeStatement(sprintf('UNLISTEN "%s"', $this->configuration['table_name'])); + $this->executeStatement(\sprintf('UNLISTEN "%s"', $this->configuration['table_name'])); } }