8000 [Messenger] Add support for PostgreSQL LISTEN/NOTIFY · symfony/symfony@01f33c3 · GitHub
[go: up one dir, main page]

Skip to content

Commit 01f33c3

Browse files
dunglasfabpot
authored andcommitted
[Messenger] Add support for PostgreSQL LISTEN/NOTIFY
1 parent a1e4222 commit 01f33c3

File tree

7 files changed

+206
-19
lines changed

7 files changed

+206
-19
lines changed

src/Symfony/Component/Messenger/Bridge/Doctrine/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ CHANGELOG
55
-----
66

77
* Introduced the Doctrine bridge.
8+
* Added support for PostgreSQL `LISTEN`/`NOTIFY`.

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportFactoryTest.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;
1919
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
2020
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransportFactory;
21+
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;
2122
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2223

2324
class DoctrineTransportFactoryTest extends TestCase
@@ -49,7 +50,7 @@ public function testCreateTransport()
4950
$serializer = $this->createMock(SerializerInterface::class);
5051

5152
$this->assertEquals(
52-
new DoctrineTransport(new Connection(Connection::buildConfiguration('doctrine://default'), $driverConnection), $serializer),
53+
new DoctrineTransport(new Connection(PostgreSqlConnection::buildConfiguration('doctrine://default'), $driverConnection), $serializer),
5354
$factory->createTransport('doctrine://default', [], $serializer)
5455
);
5556
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport;
13+
14+
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
15+
use PHPUnit\Framework\TestCase;
16+
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;
17+
18+
/**
19+
* @author Kévin Dunglas <dunglas@gmail.com>
20+
*/
21+
class PostgreSqlConnectionTest extends TestCase
22+
{
23+
public function testSerialize()
24+
{
25+
$this->expectException(\BadMethodCallException::class);
26+
$this->expectExceptionMessage('Cannot serialize '.PostgreSqlConnection::class);
27+
28+
$schemaSynchronizer = $this->createMock(SchemaSynchronizer::class);
29+
$driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class);
30+
31+
$connection = new PostgreSqlConnection([], $driverConnection, $schemaSynchronizer);
32+
serialize($connection);
33+
}
34+
35+
public function testUnserialize()
36+
{
37+
$this->expectException(\BadMethodCallException::class);
38+
$this->expectExceptionMessage('Cannot unserialize '.PostgreSqlConnection::class);
39+
40+
$schemaSynchronizer = $this->createMock(SchemaSynchronizer::class);
41+
$driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class);
42+
43+
$connection = new PostgreSqlConnection([], $driverConnection, $schemaSynchronizer);
44+
$connection->__wakeup();
45+
}
46+
}

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,17 @@
2222
use Doctrine\DBAL\Types\Type;
2323
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
2424
use Symfony\Component\Messenger\Exception\TransportException;
25+
use Symfony\Contracts\Service\ResetInterface;
2526

2627
/**
27-
* @author Vincent Touzet <vincent.touzet@gmail.com>
28+
* @internal since Symfony 5.1
2829
*
29-
* @final
30+
* @author Vincent Touzet <vincent.touzet@gmail.com>
31+
* @author Kévin Dunglas <dunglas@gmail.com>
3032
*/
31-
class Connection
33+
class Connection implements ResetInterface
3234
{
33-
private const DEFAULT_OPTIONS = [
35+
protected const DEFAULT_OPTIONS = [
3436
'table_name' => 'messenger_messages',
3537
'queue_name' => 'default',
3638
'redeliver_timeout' => 3600,
@@ -45,22 +47,28 @@ class Connection
4547
* * table_name: name of the table
4648
* * connection: name of the Doctrine's entity manager
4749
* * queue_name: name of the queue
48-
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default 3600
49-
* * auto_setup: Whether the table should be created automatically during send / get. Default : true
50+
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default: 3600
51+
* * auto_setup: Whether the table should be created automatically during send / get. Default: true
5052
*/
51-
private $configuration = [];
52-
private $driverConnection;
53+
protected $configuration = [];
54+
protected $driverConnection;
55+
protected $queueEmptiedAt;
5356
private $schemaSynchronizer;
5457
private $autoSetup;
5558

5659
public function __construct(array $configuration, DBALConnection $driverConnection, SchemaSynchronizer $schemaSynchronizer = null)
5760
{
58-
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
61+
$this->configuration = array_replace_recursive(static::DEFAULT_OPTIONS, $configuration);
5962
$this->driverConnection = $driverConnection;
6063
$this->schemaSynchronizer = $schemaSynchronizer ?? new SingleDatabaseSynchronizer($this->driverConnection);
6164
$this->autoSetup = $this->configuration['auto_setup'];
6265
}
6366

67+
public function reset()
68+
{
69+
$this->queueEmptiedAt = null;
70+
}
71+
6472
public function getConfiguration(): array
6573
{
6674
return $this->configuration;
@@ -78,20 +86,20 @@ public static function buildConfiguration(string $dsn, array $options = []): arr
7886
}
7987

8088
$configuration = ['connection' => $components['host']];
81-
$configuration += $options + $query + self::DEFAULT_OPTIONS;
89+
$configuration += $options + $query + static::DEFAULT_OPTIONS;
8290

8391
$configuration['auto_setup'] = filter_var($configuration['auto_setup'], FILTER_VALIDATE_BOOLEAN);
8492

8593
// check for extra keys in options
86-
$optionsExtraKeys = ar 4F6 ray_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS));
94+
$optionsExtraKeys = array_diff(array_keys($options), array_keys(static::DEFAULT_OPTIONS));
8795
if (0 < \count($optionsExtraKeys)) {
88-
throw new InvalidArgumentException(sprintf('Unknown option found : [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
96+
throw new InvalidArgumentException(sprintf('Unknown option found : [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS))));
8997
}
9098

9199
// check for extra keys in options
92-
$queryExtraKeys = array_diff(array_keys($query), array_keys(self::DEFAULT_OPTIONS));
100+
$queryExtraKeys = array_diff(array_keys($query), array_keys(static::DEFAULT_OPTIONS));
93101
if (0 < \count($queryExtraKeys)) {
94-
throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
102+
throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS))));
95103
}
96104

97105
return $configuration;
@@ -154,9 +162,13 @@ public function get(): ?array
154162

155163
if (false === $doctrineEnvelope) {
156164
$this->driverConnection->commit();
165+
$this->queueEmptiedAt = microtime(true) * 1000;
157166

158167
return null;
159168
}
169+
// Postgres can "group" notifications having the same channel and payload
170+
// We need to be sure to empty the queue before blocking again
171+
$this->queueEmptiedAt = null;
160172

161173
$doctrineEnvelope = $this->decodeEnvelopeHeaders($doctrineEnvelope);
162174

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransportFactory.php

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,22 @@ public function __construct($registry)
3636

3737
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
3838
{
39-
unset($options['transport_name']);
40-
$configuration = Connection::buildConfiguration($dsn, $options);
39+
$useNotify = ($options['use_notify'] ?? true);
40+
unset($options['transport_name'], $options['use_notify']);
41+
// Always allow PostgreSQL-specific keys, to be able to transparently fallback to the native driver when LISTEN/NOTIFY isn't available
42+
$configuration = PostgreSqlConnection::buildConfiguration($dsn, $options);
4143

4244
try {
4345
$driverConnection = $this->registry->getConnection($configuration['connection']);
4446
} catch (\InvalidArgumentException $e) {
4547
throw new TransportException(sprintf('Could not find Doctrine connection from Messenger DSN "%s".', $dsn), 0, $e);
4648
}
4749

48-
$connection = new Connection($configuration, $driverConnection);
50+
if ($useNotify && method_exists($driverConnection->getWrappedConnection(), 'pgsqlGetNotify')) {
51+
$connection = new PostgreSqlConnection($configuration, $driverConnection);
52+
} else {
53+
$connection = new Connection($configuration, $driverConnection);
54+
}
4955

5056
return new DoctrineTransport($connection, $serializer);
5157
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
15+
16+
/**
17+
* Uses PostgreSQL LISTEN/NOTIFY to push messages to workers.
18+
*
19+
* @internal
20+
* @final
21+
*
22+
* @author Kévin Dunglas <dunglas@gmail.com>
23+
*/
24+
class PostgreSqlConnection extends Connection
25+
{
26+
/**
27+
* * use_notify: Set to false to disable the use of LISTEN/NOTIFY. Default: true
28+
* * check_delayed_interval: The interval to check for delayed messages, in milliseconds. Set to 0 to disable checks. Default: 1000
29+
* * get_notify_timeout: The length of time to wait for a response when calling PDO::pgsqlGetNotify, in milliseconds. Default: 0.
30+
*/
31+
protected const DEFAULT_OPTIONS = parent::DEFAULT_OPTIONS + [
32+
'check_delayed_interval' => 1000,
33+
'get_notify_timeout' => 0,
34+
];
35+
36+
private $listening = false;
37+
38+
public function __sleep()
39+
{
40+
throw new \BadMethodCallException('Cannot serialize '.__CLASS__);
41+
}
42+
43+
public function __wakeup()
44+
{
45+
throw new \BadMethodCallException('Cannot unserialize '.__CLASS__);
46+
}
47+
48+
public function __destruct()
49+
{
50+
$this->unlisten();
51+
}
52+
53+
public function reset()
54+
{
55+
parent::reset();
56+
$this->unlisten();
57+
}
58+
59+
public function get(): ?array
60+
{
61+
if (null === $this->queueEmptiedAt) {
62+
return parent::get();
63+
}
64+
65+
if (!$this->listening) {
66+
// This is secure because the table name must be a valid identifier:
67+
// https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
68+
$this->driverConnection->exec(sprintf('LISTEN "%s"', $this->configuration['table_name']));
69+
$this->listening = true;
70+
}
71+
72+
$notification = $this->driverConnection->getWrappedConnection()->pgsqlGetNotify(\PDO::FETCH_ASSOC, $this->configuration['get_notify_timeout']);
73+
if (
74+
// no notifications, or for another table or queue
75+
(false === $notification || $notification['message'] !== $this->configuration['table_name'] || $notification['payload'] !== $this->configuration['queue_name']) &&
76+
// delayed messages
77+
(microtime(true) * 1000 - $this->queueEmptiedAt < $this->configuration['check_delayed_interval'])
78+
) {
79+
return null;
80+
}
81+
82+
return parent::get();
83+
}
84+
85+
public function setup(): void
86+
{
87+
parent::setup();
88+
89+
$sql = sprintf(<<<'SQL'
90+
LOCK TABLE %1$s;
91+
-- create trigger function
92+
CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$
93+
BEGIN
94+
PERFORM pg_notify('%1$s', NEW.queue_name::text);
95+
RETURN NEW;
96+
END;
97+
$$ LANGUAGE plpgsql;
98+
99+
-- register trigger
100+
DROP TRIGGER IF EXISTS notify_trigger ON %1$s;
101+
102+
CREATE TRIGGER notify_trigger
103+
AFTER INSERT
104+
ON %1$s
105+
FOR EACH ROW EXECUTE PROCEDURE notify_%1$s();
106+
SQL
107+
, $this->configuration['table_name']);
108+
$this->driverConnection->exec($sql);
109+
}
110+
111+
private function unlisten()
112+
{
113+
if (!$this->listening) {
114+
return;
115+
}
116+
117+
$this->driverConnection->exec(sprintf('UNLISTEN "%s"', $this->configuration['table_name']));
118+
$this->listening = false;
119+
}
120+
}

src/Symfony/Component/Messenger/Bridge/Doctrine/composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
"php": "^7.2.5",
2020
"doctrine/dbal": "^2.6",
2121
"doctrine/persistence": "^1.3",
22-
"symfony/messenger": "^5.1"
22+
"symfony/messenger": "^5.1",
23+
"symfony/service-contracts": "^1.1|^2"
2324
},
2425
"require-dev": {
2526
"symfony/serializer": "^4.4|^5.0",

0 commit comments

Comments
 (0)
0