8000 bug #53819 [Doctrine Messenger] Fix support for pgsql + pgbouncer. (j… · symfony/symfony@189bfeb · GitHub
[go: up one dir, main page]

Skip to content

Commit 189bfeb

Browse files
committed
bug #53819 [Doctrine Messenger] Fix support for pgsql + pgbouncer. (jwage)
This PR was squashed before being merged into the 6.4 branch. Discussion ---------- [Doctrine Messenger] Fix support for pgsql + pgbouncer. | Q | A | ------------- | --- | Branch? | 6.4 | Bug fix? | yes | New feature? | no | Deprecations? | no | Issues | | License | MIT ## Problem When you use PgBouncer in front of a PostgreSQL server with transaction pooling mode enabled, the `INSERT` and the `lastInsertId()` happen in separate transactions which are separate connections/sessions when using PgBouncer. So the call to `lastInsertId()` fails with the following exception: ``` [Doctrine\DBAL\Exception\DriverException (7)] An exception occurred in the driver: SQLSTATE[55000]: Object not in prerequisite state: 7 ERROR: lastval is not yet defined in this session Exception trace: at /app/vendor/doctrine/dbal/src/Driver/API/PostgreSQL/ExceptionConverter.php:87 Doctrine\DBAL\Driver\API\PostgreSQL\ExceptionConverter->convert() at /app/vendor/doctrine/dbal/src/Connection.php:1938 Doctrine\DBAL\Connection->handleDriverException() at /app/vendor/doctrine/dbal/src/Connection.php:1886 Doctrine\DBAL\Connection->convertException() at /app/vendor/doctrine/dbal/src/Connection.php:1253 Doctrine\DBAL\Connection->lastInsertId() at /app/vendor/symfony/doctrine-messenger/Transport/Connection.php:156 Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection->send() at /app/vendor/symfony/doctrine-messenger/Transport/DoctrineSender.php:46 Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineSender->send() at /app/vendor/symfony/doctrine-messenger/Transport/DoctrineTransport.php:72 Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport->send() at /app/vendor/symfony/messenger/EventListener/SendFailedMessageForRetryListener.php:81 Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener->onMessageFailed() at /app/vendor/symfony/event-dispatcher/EventDispatcher.php:220 Symfony\Component\EventDispatcher\EventDispatcher->callListeners() at /app/vendor/symfony/event-dispatcher/EventDispatcher.php:56 Symfony\Component\EventDispatcher\EventDispatcher->dispatch() at /app/vendor/symfony/messenger/Worker.php:198 Symfony\Component\Messenger\Worker->ack() at /app/vendor/symfony/messenger/Worker.php:174 Symfony\Component\Messenger\Worker->handleMessage() at /app/vendor/symfony/messenger/Worker.php:109 ``` ## Solution Wrap the `INSERT` and `lastInsertId()` with a single transaction, then when `lastInsertId()` is called, it will be within the same session that the message was inserted in. In addition, this PR adds the ability to use PostgresSQL `RETURNING id` clause instead of calling `lastInsertId()` so we can get the id of the inserted message in one operation instead of two. TODO: - [x] Add test for table not found scenario when inserting a message. - [x] Add tests for when lastInsertId returns false, int and string. - [x] Is there a place where I can write an integration test for this behavior that already exists? - [x] Investigate using pgsql RETURNING clause to simplify this. The insert can return the id after the message is inserted. - [ ] Squash commits to one clean commit before merge. Commits ------- c5830b4 [Doctrine Messenger] Fix support for pgsql + pgbouncer.
2 parents 7080b83 + c5830b4 commit 189bfeb

File tree

Expand file tree

7 files changed

+351
-13
lines changed

7 files changed

+351
-13
lines changed

.github/workflows/integration-tests.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,14 @@ jobs:
140140
sudo service redis-server restart
141141
echo "::endgroup::"
142142
143+
- name: Install pgbouncer
144+
run: |
145+
sudo apt-get install -y pgbouncer
146+
sudo cp src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Fixtures/pgbouncer/pgbouncer.ini /etc/pgbouncer/pgbouncer.ini
147+
sudo cp src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Fixtures/pgbouncer/userlist.txt /etc/pgbouncer/userlist.txt
148+
sudo service pgbouncer restart
149+
sudo su - postgres -c "PGPASSWORD=password psql -Atq -h localhost -p 5432 -U postgres -d postgres -c \"SELECT usename, passwd FROM pg_shadow\""
150+
143 8000 151
- name: Configure Couchbase
144152
run: |
145153
curl -s -u 'username=Administrator&password=111111' -X POST http://localhost:8091/node/controller/setupServices -d 'services=kv%2Cn1ql%2Cindex%2Cfts'
@@ -196,6 +204,7 @@ jobs:
196204
MESSENGER_SQS_FIFO_QUEUE_DSN: "sqs://localhost:4566/messages.fifo?sslmode=disable&poll_timeout=0.01"
197205
KAFKA_BROKER: 127.0.0.1:9092
198206
POSTGRES_HOST: localhost
207+
PGBOUNCER_HOST: localhost:6432
199208

200209
#- name: Run HTTP push tests
201210
# if: matrix.php == '8.1'
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[databases]
2+
postgres = host=localhost port=5432 user=postgres dbname=postgres pool_mode=transaction
3+
4+
[pgbouncer]
5+
logfile = /var/log/postgresql/pgbouncer.log
6+
pidfile = /var/run/postgresql/pgbouncer.pid
7+
listen_addr = localhost
8+
listen_port = 6432
9+
unix_socket_dir = /var/run/postgresql
10+
auth_type = md5
11+
auth_file = /etc/pgbouncer/userlist.txt
12+
max_client_conn = 20
13+
default_pool_size = 20
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"postgres" "md532e12f215ba27cb750c9e093ce4b5127"

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,104 @@ public function testItThrowsATransportExceptionIfItCannotRejectMessage()
119119
$connection->reject('dummy_id');
120120
}
121121

122+
public function testSend()
123+
{
124+
$queryBuilder = $this->getQueryBuilderMock();
125+
$driverConnection = $this->getDBALConnectionMock();
126+
127+
$driverConnection->expects($this->once())
128+
->method('createQueryBuilder')
129+
->willReturn($queryBuilder);
130+
131+
$queryBuilder->expects($this->once())
132+
->method('insert')
133+
->willReturn($queryBuilder);
134+
135+
$queryBuilder->expects($this->once())
136+
->method('values')
137+
->with([
138+
'body' => '?',
139+
'headers' => '?',
140+
'queue_name' => '?',
141+
'created_at' => '?',
142+
'available_at' => '?',
143+
])
144+
->willReturn($queryBuilder);
145+
146+
$queryBuilder->expects($this->once())
147+
->method('getSQL')
148+
->willReturn('INSERT');
149+
150+
$driverConnection->expects($this->once())
151+
->method('beginTransaction');
152+
153+
$driverConnection->expects($this->once())
154+
->method('executeStatement')
155+
->with('INSERT')
156+
->willReturn(1);
157+
158+
$driverConnection->expects($this->once())
159+
->method('lastInsertId')
160+
->willReturn('1');
161+
162+
$driverConnection->expects($this->once())
163+
->method('commit');
164+
165+
$connection = new Connection([], $driverConnection);
166+
$id = $connection->send('test', []);
167+
168+
self::assertSame('1', $id);
169+
}
170+
171+
public function testSendLastInsertIdReturnsInteger()
172+
{
173+
$queryBuilder = $this->getQueryBuilderMock();
174+
$driverConnection = $this->getDBALConnectionMock();
175+
176+
$driverConnection->expects($this->once())
177+
->method('createQueryBuilder')
178+
->willReturn($queryBuilder);
179+
180+
$queryBuilder->expects($this->once())
181+
->method('insert')
182+
->willReturn($queryBuilder);
183+
184+
$queryBuilder->expects($this->once())
185+
->method('values')
186+
->with([
187+
'body' => '?',
188+
'headers' => '?',
189+
'queue_name' => '?',
190+
'created_at' => '?',
191+
'available_at' => '?',
192+
])
193+
->willReturn($queryBuilder);
194+
195+
$queryBuilder->expects($this->once())
196+
->method('getSQL')
197+
->willReturn('INSERT');
198+
199+
$driverConnection->expects($this->once())
200+
->method('beginTransaction');
201+
202+
$driverConnection->expects($this->once())
203+
->method('executeStatement')
204+
->with('INSERT')
205+
->willReturn(1);
206+
207+
$driverConnection->expects($this->once())
208+
->method('lastInsertId')
209+
->willReturn(1);
210+
211+
$driverConnection->expects($this->once())
212+
->method('commit');
213+
214+
$connection = new Connection([], $driverConnection);
215+
$id = $connection->send('test', []);
216+
217+
self::assertSame('1', $id);
218+
}
219+
122220
private function getDBALConnectionMock()
123221
{
124222
$driverConnection = $this->createMock(DBALConnection::class);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
1066A * For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport;
13+
14+
use Doctrine\DBAL\Configuration;
15+
use Doctrine\DBAL\Connection;
16+
use Doctrine\DBAL\DriverManager;
17+
use Doctrine\DBAL\Schema\AbstractSchemaManager;
18+
use Doctrine\DBAL\Schema\DefaultSchemaManagerFactory;
19+
use Doctrine\DBAL\Tools\DsnParser;
20+
use PHPUnit\Framework\TestCase;
21+
use Symfony\Component\Messenger\Bridge\Doctrine\Tests\Fixtures\DummyMessage;
22+
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;
23+
24+
/**
25+
* This tests using PostgreSqlConnection with PgBouncer between pgsql and the application.
26+
*
27+
* @requires extension pdo_pgsql
28+
*
29+
* @group integration
30+
*/
31+
class DoctrinePostgreSqlPgbouncerIntegrationTest extends TestCase
32+
{
33+
private Connection $driverConnection;
34+
private PostgreSqlConnection $connection;
35+
36+
public function testSendAndGetWithAutoSetupEnabledAndNotSetupAlready()
37+
{
38+
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
39+
40+
$encoded = $this->connection->get();
41+
$this->assertSame('{"message": "Hi"}', $encoded['body']);
42+
$this->assertSame(['type' => DummyMessage::class], $encoded['headers']);
43+
44+
$this->assertNull($this->connection->get());
45+
}
46+
47+
public function testSendAndGetWithAutoSetupEnabledAndSetupAlready()
48+
{
49+
$this->connection->setup();
50+
51+
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
52+
53+
$encoded = $this->connection->get();
54+
$this->assertSame('{"message": "Hi"}', $encoded['body']);
55+
$this->assertSame(['type' => DummyMessage::class], $encoded['headers']);
56+
57+
$this->assertNull($this->connection->get());
58+
}
59+
60+
protected function setUp(): void
61+
{
62+
if (!$host = getenv('PGBOUNCER_HOST')) {
63+
$this->markTestSkipped('Missing PGBOUNCER_HOST env variable');
64+
}
65+
66+
$url = "pdo-pgsql://postgres:password@$host";
67+
$params = class_exists(DsnParser::class) ? (new DsnParser())->parse($url) : ['url' => $url];
68+
$config = new Configuration();
69+
if (class_exists(DefaultSchemaManagerFactory::class)) {
70+
$config->setSchemaManagerFactory(new DefaultSchemaManagerFactory());
71+
}
72+
73+
$this->driverConnection = DriverManager::getConnection($params, $config);
74+
$this->connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $this->driverConnection);
75+
}
76+
77+
protected function tearDown(): void
78+
{
79+
$this->createSchemaManager()->dropTable('queue_table');
80+
$this->driverConnection->close();
81+
}
82+
83+
private function createSchemaManager(): AbstractSchemaManager
84+
{
85+
return method_exists($this->driverConnection, 'createSchemaManager')
86+
? $this->driverConnection->createSchemaManager()
87+
: $this->driverConnection->getSchemaManager();
88+
}
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport;
13+
14+
use Doctrine\DBAL\Configuration;
15+
use Doctrine\DBAL\DriverManager;
16+
use Doctrine\DBAL\Schema\AbstractSchemaManager;
17+
use Doctrine\DBAL\Schema\DefaultSchemaManagerFactory;
18+
use Doctrine\DBAL\Tools\DsnParser;
19+
use PHPUnit\Framework\TestCase;
20+
use Symfony\Component\Messenger\Bridge\Doctrine\Tests\Fixtures\DummyMessage;
21+
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;
22+
23+
/**
24+
* This tests a using Doctrine PostgreSql connection without using PostgreSqlConnection
25+
* that gets used when use_notify is enabled.
26+
*
27+
* @requires extension pdo_pgsql
28+
*
29+
* @group integration
30+
*/
31+
class DoctrinePostgreSqlRegularIntegrationTest extends TestCase
32+
{
33+
private \Doctrine\DBAL\Connection $driverConnection;
34+
private Connection $connection;
35+
36+
public function testSendAndGetWithAutoSetupEnabledAndNotSetupAlready()
37+
{
38+
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
39+
40+
$encoded = $this->connection->get();
41+
$this->assertSame('{"message": "Hi"}', $encoded['body']);
42+
$this->assertSame(['type' => DummyMessage::class], $encoded['headers']);
43+
44+
$this->assertNull($this->connection->get());
45+
}
46+
47+
public function testSendAndGetWithAutoSetupEnabledAndSetupAlready()
48+
{
49+
$this->connection->setup();
50+
51+
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
52+
53+
$encoded = $this->connection->get();
54+
$this->assertSame('{"message": "Hi"}', $encoded['body']);
55+
$this->assertSame(['type' => DummyMessage::class], $encoded['headers']);
56+
57+
$this->assertNull($this->connection->get());
58+
}
59+
60+
protected function setUp(): void
61+
{
62+
if (!$host = getenv('POSTGRES_HOST')) {
63+
$this->markTestSkipped('Missing POSTGRES_HOST env variable');
64+
}
65+
66+
$url = "pdo-pgsql://postgres:password@$host";
67+
$params = class_exists(DsnParser::class) ? (new DsnParser())->parse($url) : ['url' => $url];
68+
$config = new Configuration();
69+
if (class_exists(DefaultSchemaManagerFactory::class)) {
70+
$config->setSchemaManagerFactory(new DefaultSchemaManagerFactory());
71+
}
72+
73+
$this->driverConnection = DriverManager::getConnection($params, $config);
74+
$this->connection = new Connection(['table_name' => 'queue_table'], $this->driverConnection);
75+
}
76+
77+
protected function tearDown(): void
78+
{
79+
$this->createSchemaManager()->dropTable('queue_table');
80+
$this->driverConnection->close();
81+
}
82+
83+
private function createSchemaManager(): AbstractSchemaManager
84+
{
85+
return method_exists($this->driverConnection, 'createSchemaManager')
86+
? $this->driverConnection->createSchemaManager()
87+
: $this->driverConnection->getSchemaManager();
88+
}
89+
}

0 commit comments

Comments
 (0)
0