8000 feature #35485 [Messenger] Add support for PostgreSQL LISTEN/NOTIFY (… · symfony/symfony@e3fa80a · GitHub
[go: up one dir, main page]

Skip to content

Commit e3fa80a

Browse files
committed
feature #35485 [Messenger] Add support for PostgreSQL LISTEN/NOTIFY (dunglas)
This PR was squashed before being merged into the 5.1-dev branch (closes #35485). Discussion ---------- [Messenger] Add support for PostgreSQL LISTEN/NOTIFY | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes <!-- please update src/**/CHANGELOG.md files --> | Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files --> | Tickets | n/a | License | MIT | Doc PR | todo PostgreSQL comes with a builtin, performant, scalable and transactional pub/sub system called [`LISTEN`/`NOTIFY`](https://www.postgresql.org/docs/current/sql-notify.html). This PR allows to leverage this mechanism when using the Messenger component with Postgres. When the Postgres is used, workers are notified in real-time when a message is dispatched. This reduces the latency, and prevents the worker from executing useless SQL queries. Basically, it allows to switch from a polling-based approach to an event-based one. This patch can be used with all existing installation of Messenger, as long as the underlying DBMS is Postgres. For many (most ?) projects, it allows to get the benefits of using a queue system such as RabbitMQ or Pulsar without having to introduce new services to monitor, replicate or upgrade. If PostgreSQL is used, `LISTEN`/`NOTIFY` is used automatically! That's all! It's also possible to configure how long the worker must wait for new messages: ```yaml framework: messenger: transports: async: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: pgsql_get_notify: true pgsql_get_notify_timeout: 500 ``` Then you can use start the workers with something like: `php bin/console messenger:consume --sleep=0` A demo app using this new feature is available in this repository: https://github.com/dunglas/demo-postgres-listen-notify TODO: * [ ] Add tests Commits ------- 01f33c3 [Messenger] Add support for PostgreSQL LISTEN/NOTIFY
2 parents 9613f84 + 01f33c3 commit e3fa80a

File tree

7 files changed

+206
-19
lines changed

7 files changed

+206
-19
lines changed

src/Symfony/Component/Messenger/Bridge/Doctrine/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ CHANGELOG
55
-----
66

77
* Introduced the Doctrine bridge.
8+
* Added support for PostgreSQL `LISTEN`/`NOTIFY`.

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportFactoryTest.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;
1919
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
2020
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransportFactory;
21+
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;
2122
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2223

2324
class DoctrineTransportFactoryTest extends TestCase
@@ -49,7 +50,7 @@ public function testCreateTransport()
4950
$serializer = $this->createMock(SerializerInterface::class);
5051

5152
$this->assertEquals(
52-
new DoctrineTransport(new Connection(Connection::buildConfiguration('doctrine://default'), $driverConnection), $serializer),
53+
new DoctrineTransport(new Connection(PostgreSqlConnection::buildConfiguration('doctrine://default'), $driverConnection), $serializer),
5354
$factory->createTransport('doctrine://default', [], $serializer)
5455
);
5556
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport;
13+
14+
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
15+
use PHPUnit\Framework\TestCase;
16+
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;
17+
18+
/**
19+
* @author Kévin Dunglas <dunglas@gmail.com>
20+
*/
21+
class PostgreSqlConnectionTest extends TestCase
22+
{
23+
public function testSerialize()
24+
{
25+
$this->expectException(\BadMethodCallException::class);
26+
$this->expectExceptionMessage('Cannot serialize '.PostgreSqlConnection::class);
27+
28+
$schemaSynchronizer = $this->createMock(SchemaSynchronizer::class);
29+
$driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class);
30+
31+
$connection = new PostgreSqlConnection([], $driverConnection, $schemaSynchronizer);
32+
serialize($connection);
33+
}
34+
35+
public function testUnserialize()
36+
{
37+
$this->expectException(\BadMethodCallException::class);
38+
$this->expectExceptionMessage('Cannot unserialize '.PostgreSqlConnection::class);
39+
40+
$schemaSynchronizer = $this->createMock(SchemaSynchronizer::class);
41+
$driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class);
42+
43+
$connection = new PostgreSqlConnection([], $driverConnection, $schemaSynchronizer);
44+
$connection->__wakeup();
45+
}
46+
}

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,17 @@
2222
use Doctrine\DBAL\Types\Type;
2323
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
2424
use Symfony\Component\Messenger\Exception\TransportException;
25+
use Symfony\Contracts\Service\ResetInterface;
2526

2627
/**
27-
* @author Vincent Touzet <vincent.touzet@gmail.com>
28+
* @internal since Symfony 5.1
2829
*
29-
* @final
30+
* @author Vincent Touzet <vincent.touzet@gmail.com>
31+
* @author Kévin Dunglas <dunglas@gmail.com>
3032
*/
31-
class Connection
33+
class Connection implements ResetInterface
3234
{
33-
private const DEFAULT_OPTIONS = [
35+
protected const DEFAULT_OPTIONS = [
3436
'table_name' => 'messenger_messages',
3537
'queue_name' => 'default',
3638
'redeliver_timeout' => 3600,
@@ -45,22 +47,28 @@ class Connection
4547
* * table_name: name of the table
4648
* * connection: name of the Doctrine's entity manager
4749
* * queue_name: name of the queue
48-
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default 3600
49-
* * auto_setup: Whether the table should be created automatically during send / get. Default : true
50+
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default: 3600
51+
* * auto_setup: Whether the table should be created automatically during send / get. Default: true
5052
*/
51-
private $configuration = [];
52-
private $driverConnection;
53+
protected $configuration = [];
54+
protected $driverConnection;
55+
protected $queueEmptiedAt;
5356
private $schemaSynchronizer;
5457
private $autoSetup;
5558

5659
public function __construct(array $configuration, DBALConnection $driverConnection, SchemaSynchronizer $schemaSynchronizer = null)
5760
{
58-
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
61+
$this->configuration = array_replace_recursive(static::DEFAULT_OPTIONS, $configuration);
5962
$this->driverConnection = $driverConnection;
6063
$this->schemaSynchronizer = $schemaSynchronizer ?? new SingleDatabaseSynchronizer($this->driverConnection);
6164
$this->autoSetup = $this->configuration['auto_setup'];
6265
}
6366

67+
public function reset()
68+
{
69+
$this->queueEmptiedAt = null;
70+
}
71+
6472
public function getConfiguration(): array
6573
{
6674
return $this->configuration;
@@ -78,20 +86,20 @@ public static function buildConfiguration(string $dsn, array $options = []): arr
7886
}
7987

8088
$configuration = ['connection' => $components['host']];
81-
$configuration += $options + $query + self::DEFAULT_OPTIONS;
89+
$configuration += $options + $query + static::DEFAULT_OPTIONS;
8290

8391
$configuration['auto_setup'] = filter_var($configuration['auto_setup'], FILTER_VALIDATE_BOOLEAN);
8492

8593
// check for extra keys in options
86-
$optionsExtraKeys = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS));
94+
$optionsExtraKeys = array_diff(array_keys($options), array_keys(static::DEFAULT_OPTIONS));
8795
if (0 < \count($optionsExtraKeys)) {
88-
throw new InvalidArgumentException(sprintf('Unknown option found: [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
96+
throw new InvalidArgumentException(sprintf('Unknown option found: [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS))));
8997
}
9098

9199
// check for extra keys in options
92-
$queryExtraKeys = array_diff(array_keys($query), array_keys(self::DEFAULT_OPTIONS));
100+
$queryExtraKeys = array_diff(array_keys($query), array_keys(static::DEFAULT_OPTIONS));
93101
if (0 < \count($queryExtraKeys)) {
94-
throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
102+
throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS))));
95103
}
96104

97105
return $configuration;
@@ -154,9 +162,13 @@ public function get(): ?array
154162

155163
if (false === $doctrineEnvelope) {
156164
$this->driverConnection->commit();
165+
$this->queueEmptiedAt = microtime(true) * 1000;
157166

158167
return null;
159168
}
169+
// Postgres can "group" notifications having the same channel and payload
170+
// We need to be sure to empty the queue before blocking again
171+
$this->queueEmptiedAt = null;
160172

161173
$doctrineEnvelope = $this->decodeEnvelopeHeaders($doctrineEnvelope);
162174

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransportFactory.php

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,22 @@ public function __construct($registry)
3636

3737
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
3838
{
39-
unset($options['transport_name']);
40-
$configuration = Connection::buildConfiguration($dsn, $options);
39+
$useNotify = ($options['use_notify'] ?? true);
40+
unset($options['transport_name'], $options['use_notify']);
41+
// Always allow PostgreSQL-specific keys, to be able to transparently fallback to the native driver when LISTEN/NOTIFY isn't available
42+
$configuration = PostgreSqlConnection::buildConfiguration($dsn, $options);
4143

4244
try {
4345
$driverConnection = $this->registry->getConnection($configuration['connection']);
4446
} catch (\InvalidArgumentException $e) {
4547
throw new TransportException(sprintf('Could not find Doctrine connection from Messenger DSN "%s".', $dsn), 0, $e);
4648
}
4749

48-
$connection = new Connection($configuration, $driverConnection);
50+
if ($useNotify && method_exists($driverConnection->getWrappedConnection(), 'pgsqlGetNotify')) {
51+
$connection = new PostgreSqlConnection($configuration, $driverConnection);
52+
} else {
53+
$connection = new Connection($configuration, $driverConnection);
54+
}
4955

5056
return new DoctrineTransport($connection, $serializer);
5157
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
15+
16+
/**
17+
* Uses PostgreSQL LISTEN/NOTIFY to push messages to workers.
18+
*
19+
* @internal
20+
* @final
21+
*
22+
* @author Kévin Dunglas <dunglas@gmail.com>
23+
*/
24+
class PostgreSqlConnection extends Connection
25+
{
26+
/**
27+
* * use_notify: Set to false to disable the use of LISTEN/NOTIFY. Default: true
28+
* * check_delayed_interval: The interval to check for delayed messages, in milliseconds. Set to 0 to disable checks. Default: 1000
29+
* * get_notify_timeout: The length of time to wait for a response when calling PDO::pgsqlGetNotify, in milliseconds. Default: 0.
30+
*/
31+
protected const DEFAULT_OPTIONS = parent::DEFAULT_OPTIONS + [
32+
'check_delayed_interval' => 1000,
33+
'get_notify_timeout' => 0,
34+
];
35+
36+
private $listening = false;
37+
38+
public function __sleep()
39+
{
40+
throw new \BadMethodCallException('Cannot serialize '.__CLASS__);
41+
}
42+
43+
public function __wakeup()
44+
{
45+
throw new \BadMethodCallException('Cannot unserialize '.__CLASS__);
46+
}
47+
48+
public function __destruct()
49+
{
50+
$this->unlisten();
51+
}
52+
53+
public function reset()
54+
{
55+
parent::reset();
56+
$this->unlisten();
57+
}
58+
59+
public function get(): ?array
60+
{
61+
if (null === $this->queueEmptiedAt) {
62+
return parent::get();
63+
}
64+
65+
if (!$this->listening) {
66+
// This is secure because the table name must be a valid identifier:
67+
// https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
68+
$this->driverConnection->exec(sprintf('LISTEN "%s"', $this->configuration['table_name']));
69+
$this->listening = true;
70+
}
71+
72+
$notification = $this->driverConnection->getWrappedConnection()->pgsqlGetNotify(\PDO::FETCH_ASSOC, $this->configuration['get_notify_timeout']);
73+
if (
74+
// no notifications, or for another table or queue
75+
(false === $notification || $notification['message'] !== $this->configuration['table_name'] || $notification['payload'] !== $this->configuration['queue_name']) &&
76+
// delayed messages
77+
(microtime(true) * 1000 - $this->queueEmptiedAt < $this->configuration['check_delayed_interval'])
78+
) {
79+
return null;
80+
}
81+
82+
return parent::get();
83+
}
84+
85+
public function setup(): void
86+
{
87+
parent::setup();
88+
89+
$sql = sprintf(<<<'SQL'
90+
LOCK TABLE %1$s;
91+
-- create trigger function
92+
CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$
93+
BEGIN
94+
PERFORM pg_notify('%1$s', NEW.queue_name::text);
95+
RETURN NEW;
96+
END;
97+
$$ LANGUAGE plpgsql;
98+
99+
-- register trigger
100+
DROP TRIGGER IF EXISTS notify_trigger ON %1$s;
101+
102+
CREATE TRIGGER notify_trigger
103+
AFTER INSERT
104+
ON %1$s
105+
FOR EACH ROW EXECUTE PROCEDURE notify_%1$s();
106+
SQL
107+
, $this->configuration['table_name']);
108+
$this->driverConnection->exec($sql);
109+
}
110+
111+
private function unlisten()
112+
{
113+
if (!$this->listening) {
114+
return;
115+
}
116+
117+
$this->driverConnection->exec(sprintf('UNLISTEN "%s"', $this->configuration['table_name']));
118+
$this->listening = false;
119+
}
120+
}

src/Symfony/Component/Messenger/Bridge/Doctrine/composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
"php": "^7.2.5",
2020
"doctrine/dbal": "^2.6",
2121
"doctrine/persistence": "^1.3",
22-
"symfony/messenger": "^5.1"
22+
"symfony/messenger": "^5.1",
23+
"symfony/service-contracts": "^1.1|^2"
2324
},
2425
"require-dev": {
2526
"symfony/serializer": "^4.4|^5.0",

0 commit comments

Comments
 (0)
0