10000 feature #29007 [Messenger] Add a Doctrine transport (vincenttouzet) · symfony/symfony@d9e2732 · GitHub
[go: up one dir, main page]

Skip to content

Commit d9e2732

Browse files
committed
feature #29007 [Messenger] Add a Doctrine transport (vincenttouzet)
This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Add a Doctrine transport | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | | License | MIT | Doc PR | symfony/symfony-docs#10616 | DoctrineBundle PR | doctrine/DoctrineBundle#868 As discussed with @sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component. Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database). # How it works The code is splitted betwwen this PR and the one on the DoctrineBundle : doctrine/DoctrineBundle#868 ## Configuration To configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`) ```yml # config/packages/messenger.yaml framework: messenger: transports: my_transport: "doctrine://default?queue=important" ``` ## Table schema Dispatched messages are stored into a database table with the following schema: | Column | Type | Options | Description | |--------------|----------|--------------------------|-------------------------------------------------------------------| | id | bigint | AUTO_INCREMENT, NOT NULL | Primary key | | body | text | NOT NULL | Body of the message | | headers | text | NOT NULL | Headers of the message | | queue | varchar(32) | NOT NULL | Headers of the message | | created_at | datetime | NOT NULL | When the message was inserted onto the table. (automatically set) | | available_at | datetime | NOT NULL | When the message is available to be handled | | delivered_at | datetime | NULL | When the message was delivered to a worker | ## Message dispatching When dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish` ## Message consuming The message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle. ### Getting the next message * Start a transaction * Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query) * Update the message in database to update the delivered_at columns * Commit the transaction ### Handling the message The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table. If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`. ## Message requeueing It may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds) # TODO - [x] Add tests - [x] Create DOC PR - [x] PR on doctrine-bundle for transport factory - [x] Add a `available_at` column - [x] Add a `queue` column - [x] Implement the retry functionnality : See #30557 - [x] Rebase after #29476 Commits ------- 88d008c [Messenger] Add a Doctrine transport
2 parents 88042a3 + 88d008c commit d9e2732

14 files changed

+1147
-1
lines changed

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ CHANGELOG
5959
* [BC BREAK] The Amqp Transport now automatically sets up the exchanges
6060
and queues by default. Previously, this was done when in "debug" mode
6161
only. Pass the `auto_setup` connection option to control this.
62-
6362
* Added a `SetupTransportsCommand` command to setup the transports
63+
* Added a Doctrine transport. For example, the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
6464

6565
4.2.0
6666
-----
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
13+
14+
use Doctrine\DBAL\DBALException;
15+
use Doctrine\DBAL\Driver\Statement;
16+
use Doctrine\DBAL\Platforms\AbstractPlatform;
17+
use Doctrine\DBAL\Query\QueryBuilder;
18+
use PHPUnit\Framework\TestCase;
19+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
20+
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
21+
22+
class ConnectionTest extends TestCase
23+
{
24+
public function testGetAMessageWillChangeItsStatus()
25+
{
26+
$queryBuilder = $this->getQueryBuilderMock();
27+
$driverConnection = $this->getDBALConnectionMock();
28+
$stmt = $this->getStatementMock([
29+
'id' => 1,
30+
'body' => '{"message":"Hi"}',
31+
'headers' => \json_encode(['type' => DummyMessage::class]),
32+
]);
33+
34+
$driverConnection
35+
->method('createQueryBuilder')
36+
->willReturn($queryBuilder);
37+
$queryBuilder
38+
->method('getSQL')
39+
->willReturn('');
40+
$driverConnection
41+
->method('prepare')
42+
->willReturn($stmt);
43+
44+
$connection = new Connection([], $driverConnection);
45+
$doctrineEnvelope = $connection->get();
46+
$this->assertEquals(1, $doctrineEnvelope['id']);
47+
$this->assertEquals('{"message":"Hi"}', $doctrineEnvelope['body']);
48+
$this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelope['headers']);
49+
}
50+
51+
public function testGetWithNoPendingMessageWillReturnNull()
52+
{
53+
$queryBuilder = $this->getQueryBuilderMock();
54+
$driverConnection = $this->getDBALConnectionMock();
55+
$stmt = $this->getStatementMock(false);
56+
57+
$driverConnection->expects($this->once())
58+
->method('createQueryBuilder')
59+
->willReturn($queryBuilder);
60+
$driverConnection->method('prepare')
61+
->willReturn($stmt);
62+
$driverConnection->expects($this->never())
63+
->method('update');
64+
65+
$connection = new Connection([], $driverConnection);
66+
$doctrineEnvelope = $connection->get();
67+
$this->assertNull($doctrineEnvelope);
68+
}
69+
70+
/**
71+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
72+
*/
73+
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
74+
{
75+
$driverConnection = $this->getDBALConnectionMock();
76+
$driverConnection->method('delete')->willThrowException(new DBALException());
77+
78+
$connection = new Connection([], $driverConnection);
79+
$connection->ack('dummy_id');
80+
}
81+
82+
/**
83+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
84+
*/
85+
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
86+
{
87+
$driverConnection = $this->getDBALConnectionMock();
88+
$driverConnection->method('delete')->willThrowException(new DBALException());
89+
90+
$connection = new Connection([], $driverConnection);
91+
$connection->reject('dummy_id');
92+
}
93+
94+
private function getDBALConnectionMock()
95+
{
96+
$driverConnection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class)
97+
->disableOriginalConstructor()
98+
->getMock();
99+
$platform = $this->getMockBuilder(AbstractPlatform::class)
100+
->getMock();
101+
$platform->method('getWriteLockSQL')->willReturn('FOR UPDATE');
102+
$driverConnection->method('getDatabasePlatform')->willReturn($platform);
103+
104+
return $driverConnection;
105+
}
106+
107+
private function getQueryBuilderMock()
108+
{
109+
$queryBuilder = $this->getMockBuilder(QueryBuilder::class)
110+
->disableOriginalConstructor()
111+
->getMock();
112+
113+
$queryBuilder->method('select')->willReturn($queryBuilder);
114+
$queryBuilder->method('update')->willReturn($queryBuilder);
115+
$queryBuilder->method('from')->willReturn($queryBuilder);
116+
$queryBuilder->method('set')->willReturn($queryBuilder);
117+
$queryBuilder->method('where')->willReturn($queryBuilder);
118+
$queryBuilder->method('andWhere')->willReturn($queryBuilder);
119+
$queryBuilder->method('orderBy')->willReturn($queryBuilder);
120+
$queryBuilder->method('setMaxResults')->willReturn($queryBuilder);
121+
$queryBuilder->method('setParameter')->willReturn($queryBuilder);
122+
123+
return $queryBuilder;
124+
}
125+
126+
private function getStatementMock($expectedResult)
127+
{
128+
$stmt = $this->getMockBuilder(Statement::class)
129+
->disableOriginalConstructor()
130+
->getMock();
131+
$stmt->expects($this->once())
132+
->method('fetch')
133+
->willReturn($expectedResult);
134+
135+
return $stmt;
136+
}
137+
138+
/**
139+
* @dataProvider buildConfigurationProvider
140+
*/
141+
public function testBuildConfiguration($dsn, $options, $expectedManager, $expectedTableName, $expectedRedeliverTimeout, $expectedQueue)
142+
{
143+
$config = Connection::buildConfiguration($dsn, $options);
144+
$this->assertEquals($expectedManager, $config['connection']);
145+
$this->assertEquals($expectedTableName, $config['table_name']);
146+
$this->assertEquals($expectedRedeliverTimeout, $config['redeliver_timeout']);
147+
$this->assertEquals($expectedQueue, $config['queue_name']);
148+
}
149+
150+
public function buildConfigurationProvider()
151+
{
152+
return [
153+
[
154+
'dsn' => 'doctrine://default',
155+
'options' => [],
156+
'expectedManager' => 'default',
157+
'expectedTableName' => 'messenger_messages',
158+
'expectedRedeliverTimeout' => 3600,
159+
'expectedQueue' => 'default',
160+
],
161+
// test options from options array
162+
[
163+
'dsn' => 'doctrine://default',
164+
'options' => [
165+
'table_name' => 'name_from_options',
166+
'redeliver_timeout' => 1800,
167+
'queue_name' => 'important',
168+
],
169+
'expectedManager' => 'default',
170+
'expectedTableName' => 'name_from_options',
171+
'expectedRedeliverTimeout' => 1800,
172+
'expectedQueue' => 'important',
173+
],
174+
// tests options from dsn
175+
[
176+
'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal',
177+
'options' => [],
178+
'expectedManager' => 'default',
179+
'expectedTableName' => 'name_from_dsn',
180+
'expectedRedeliverTimeout' => 1200,
181+
'expectedQueue' => 'normal',
182+
],
183+
// test options from options array wins over options from dsn
184+
[
185+
'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal',
186+
'options' => [
187+
'table_name' => 'name_from_options',
188+
'redeliver_timeout' => 1800,
189+
'queue_name' => 'important',
190+
],
191+
'expectedManager' => 'default',
192+
'expectedTableName' => 'name_from_options',
193+
'expectedRedeliverTimeout' => 1800,
194+
'expectedQueue' => 'important',
195+
],
196+
];
197+
}
198+
199+
/**
200+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
201+
*/
202+
public function testItThrowsAnExceptionIfAnExtraOptionsInDefined()
203+
{
204+
Connection::buildConfiguration('doctrine://default', ['new_option' => 'woops']);
205+
}
206+
207+
/**
208+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
209+
*/
210+
public function testItThrowsAnExceptionIfAnExtraOptionsInDefinedInDSN()
211+
{
212+
Connection::buildConfiguration('doctrine://default?new_option=woops');
213+
}
214+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
13+
14+
use Doctrine\DBAL\DriverManager;
15+
use PHPUnit\Framework\TestCase;
16+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17+
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
18+
19+
/**
20+
* @requires pdo_mysql
21+
*/
22+
class DoctrineIntegrationTest extends TestCase
23+
{
24+
private $driverConnection;
25+
private $connection;
26+
27+
protected function setUp()
28+
{
29+
parent::setUp();
30+
31+
if (!getenv('MESSENGER_DOCTRINE_DSN')) {
32+
$this->markTestSkipped('The "MESSENGER_DOCTRINE_DSN" environment variable is required.');
33+
}
34+
$dsn = getenv('MESSENGER_DOCTRINE_DSN');
35+
$this->driverConnection = DriverManager::getConnection(['url' => $dsn]);
36+
$this->connection = new Connection([], $this->driverConnection);
37+
// call send to auto-setup the table
38+
$this->connection->setup();
39+
// ensure the table is clean for tests
40+
$this->driverConnection->exec('DELETE FROM messenger_messages');
41+
}
42+
43+
public function testConnectionSendAndGet()
44+
{
45+
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
46+
$encoded = $this->connection->get();
47+
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
48+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
49+
}
50+
51+
public function testSendWithDelay()
52+
{
53+
$this->connection->send('{"message": "Hi i am delayed"}', ['type' => DummyMessage::class], 600000);
54+
55+
$available_at = $this->driverConnection->createQueryBuilder()
56+
->select('m.available_at')
57+
->from('messenger_messages', 'm')
58+
->where('m.body = :body')
59+
->setParameter(':body', '{"message": "Hi i am delayed"}')
60+
->execute()
61+
->fetchColumn();
62+
63+
$available_at = new \DateTime($available_at);
64+
65+
$now = \DateTime::createFromFormat('U.u', microtime(true));
66+
$now->modify('+60 seconds');
67+
$this->assertGreaterThan($now, $available_at);
68+
}
69+
70+
public function testItRetrieveTheFirstAvailableMessage()
71+
{
72+
// insert messages
73+
// one currently handled
74+
$this->driverConnection->insert('messenger_messages', [
75+
'body' => '{"message": "Hi handled"}',
76+
'headers' => json_encode(['type' => DummyMessage::class]),
77+
'queue_name' => 'default',
78+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
79+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
80+
'delivered_at' => Connection::formatDateTime(\DateTime::createFromFormat('U.u', microtime(true))),
81+
]);
82+
// one available later
83+
$this->driverConnection->insert('messenger_messages', [
84+
'body' => '{"message": "Hi delayed"}',
85+
'headers' => json_encode(['type' => DummyMessage::class]),
86+
'queue_name' => 'default',
87+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
88+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 13:00:00')),
89+
]);
90+
// one available
91+
$this->driverConnection->insert('messenger_messages', [
92+
'body' => '{"message": "Hi available"}',
93+
'headers' => json_encode(['type' => DummyMessage::class]),
94+
'queue_name' => 'default',
95+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
96+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
97+
]);
98+
99+
$encoded = $this->connection->get();
100+
$this->assertEquals('{"message": "Hi available"}', $encoded['body']);
101+
}
102+
103+
public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
104+
{
105+
$twoHoursAgo = new \DateTime('now');
106+
$twoHoursAgo->modify('-2 hours');
107+
$this->driverConnection->insert('messenger_messages', [
108+
'body' => '{"message": "Hi requeued"}',
109+
'headers' => json_encode(['type' => DummyMessage::class]),
110+
'queue_name' => 'default',
111+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
112+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
113+
'delivered_at' => Connection::formatDateTime($twoHoursAgo),
114+
]);
115+
$this->driverConnection->insert('messenger_messages', [
116+
'body' => '{"message": "Hi available"}',
117+
'headers' => json_encode(['type' => DummyMessage::class]),
118+
'queue_name' => 'default',
119+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
120+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
121+
]);
122+
123+
$next = $this->connection->get();
124+
$this->assertEquals('{"message": "Hi requeued"}', $next['body']);
125+
$this->connection->reject($next['id']);
126+
}
127+
}

0 commit comments

Comments
 (0)
0