8000 [Messenger] Setup the doctrine transport when consuming · symfony/symfony@b2f3b53 · GitHub
[go: up one dir, main page]

Skip to content

Commit b2f3b53

Browse files
committed
[Messenger] Setup the doctrine transport when consuming
1 parent de7d7a9 commit b2f3b53

File tree

3 files changed

+32
-5
lines changed

3 files changed

+32
-5
lines changed

src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Doctrine\DBAL\Driver\Statement;
1616
use Doctrine\DBAL\Platforms\AbstractPlatform;
1717
use Doctrine\DBAL\Query\QueryBuilder;
18+
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
1819
use PHPUnit\Framework\TestCase;
1920
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
2021
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
@@ -25,6 +26,7 @@ public function testGetAMessageWillChangeItsStatus()
2526
{
2627
$queryBuilder = $this->getQueryBuilderMock();
2728
$driverConnection = $this->getDBALConnectionMock();
29+
$schemaSynchronizer = $this->getSchemaSynchronizerMock();
2830
$stmt = $this->getStatementMock([
2931
'id' => 1,
3032
'body' => '{"message":"Hi"}',
@@ -44,7 +46,7 @@ public function testGetAMessageWillChangeItsStatus()
4446
->method('prepare')
4547
->willReturn($stmt);
4648

47-
$connection = new Connection([], $driverConnection);
49+
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
4850
$doctrineEnvelope = $connection->get();
4951
$this->assertEquals(1, $doctrineEnvelope['id']);
5052
$this->assertEquals('{"message":"Hi"}', $doctrineEnvelope['body']);
@@ -55,6 +57,7 @@ public function testGetWithNoPendingMessageWillReturnNull()
5557
{
5658
$queryBuilder = $this->getQueryBuilderMock();
5759
$driverConnection = $this->getDBALConnectionMock();
60+
$schemaSynchronizer = $this->getSchemaSynchronizerMock();
5861
$stmt = $this->getStatementMock(false);
5962

6063
$queryBuilder
@@ -68,7 +71,7 @@ public function testGetWithNoPendingMessageWillReturnNull()
6871
$driverConnection->expects($this->never())
6972
->method('update');
7073

71-
$connection = new Connection([], $driverConnection);
74+
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
7275
$doctrineEnvelope = $connection->get();
7376
$this->assertNull($doctrineEnvelope);
7477
}
@@ -142,6 +145,12 @@ private function getStatementMock($expectedResult)
142145
return $stmt;
143146
}
144147

148+
private function getSchemaSynchronizerMock()
149+
{
150+
return $this->getMockBuilder(SchemaSynchronizer::class)
151+
->getMock();
152+
}
153+
145154
/**
146155
* @dataProvider buildConfigurationProvider
147156
*/

src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,17 @@ public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
164164
$this->assertEquals('{"message": "Hi requeued"}', $next['body']);
165165
$this->connection->reject($next['id']);
166166
}
167+
168+
public function testTheTransportIsSetupOnGet()
169+
{
170+
// If the table does not exist and we call the get (i.e run messenger:consume) the table must be setup
171+
// so first delete the tables
172+
$this->driverConnection->exec('DROP TABLE messenger_messages');
173+
174+
$this->assertNull($this->connection->get());
175+
176+
$this->connection->send('the body', ['my' => 'header']);
177+
$envelope = $this->connection->get();
178+
$this->assertEquals('the body', $envelope['body']);
179+
}
1 8000 67180
}

src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Doctrine\DBAL\Exception\TableNotFoundException;
1717
use Doctrine\DBAL\Query\QueryBuilder;
1818
use Doctrine\DBAL\Schema\Schema;
19+
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
1920
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
2021
use Doctrine\DBAL\Types\Type;
2122
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
@@ -50,11 +51,13 @@ class Connection
5051
*/
5152
private $configuration = [];
5253
private $driverConnection;
54+
private $schemaSynchronizer;
5355

54-
public function __construct(array $configuration, DBALConnection $driverConnection)
56+
public function __construct(array $configuration, DBALConnection $driverConnection, SchemaSynchronizer $schemaSynchronizer = null)
5557
{
5658
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
5759
$this->driverConnection = $driverConnection;
60+
$this->schemaSynchronizer = $schemaSynchronizer ?? new SingleDatabaseSynchronizer($this->driverConnection);
5861
}
5962

6063
public function getConfiguration(): array
@@ -127,6 +130,9 @@ public function send(string $body, array $headers, int $delay = 0): void
127130

128131
public function get(): ?array
129132
{
133+
if ($this->configuration['auto_setup']) {
134+
$this->setup();
135+
}
130136
$this->driverConnection->beginTransaction();
131137
try {
132138
$query = $this->createAvailableMessagesQueryBuilder()
@@ -187,8 +193,7 @@ public function reject(string $id): bool
187193

188194
public function setup(): void
189195
{
190-
$synchronizer = new SingleDatabaseSynchronizer($this->driverConnection);
191-
$synchronizer->updateSchema($this->getSchema(), true);
196+
$this->schemaSynchronizer->updateSchema($this->getSchema(), true);
192197
}
193198

194199
public function getMessageCount(): int

0 commit comments

Comments
 (0)
0