10000 Isolate PostgreSQL-specific code in a dedicated class · symfony/symfony@a131f83 · GitHub
[go: up one dir, main page]

Skip to content

Commit a131f83

Browse files
committed
Isolate PostgreSQL-specific code in a dedicated class
1 parent dc8851d commit a131f83

File tree

4 files changed

+136
-103
lines changed

4 files changed

+136
-103
lines changed

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportFactoryTest.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;
1919
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
2020
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransportFactory;
21+
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;
2122
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2223

2324
class DoctrineTransportFactoryTest extends TestCase
@@ -49,7 +50,7 @@ public function testCreateTransport()
4950
$serializer = $this->createMock(SerializerInterface::class);
5051

5152
$this->assertEquals(
52-
new DoctrineTransport(new Connection(Connection::buildConfiguration('doctrine://default'), $driverConnection), $serializer),
53+
new DoctrineTransport(new Connection(PostgreSqlConnection::buildConfiguration('doctrine://default'), $driverConnection), $serializer),
5354
$factory->createTransport('doctrine://default', [], $serializer)
5455
);
5556
}

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php

Lines changed: 18 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,14 @@
2727
/**
2828
* @author Vincent Touzet <vincent.touzet@gmail.com>
2929
* @author Kévin Dunglas <dunglas@gmail.com>
30-
*
31-
* @final
3230
*/
3331
class Connection implements ResetInterface
3432
{
35-
private const DEFAULT_OPTIONS = [
33+
protected const DEFAULT_OPTIONS = [
3634
'table_name' => 'messenger_messages',
3735
'queue_name' => 'default',
3836
'redeliver_timeout' => 3600,
3937
'auto_setup' => true,
40-
'pgsql_get_notify' => true,
41-
'pgsql_get_notify_check_delayed_interval' => 1000,
42-
'pgsql_get_notify_timeout' => 0,
4338
];
4439

4540
/**
@@ -52,45 +47,24 @@ class Connection implements ResetInterface
5247
* * queue_name: name of the queue
5348
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default: 3600
5449
* * auto_setup: Whether the table should be created automatically during send / get. Default: true
55-
* * pgsql_get_notify: Use the LISTEN/NOTIFY feature of PostgreSQL. Default: true
56-
* * pgsql_get_notify_check_delayed_interval: The interval to check for delayed messages, in milliseconds. Set to 0 to disable checks. Default: 1000
57-
* * pgsql_get_notify_timeout: The length of time to wait for a response when calling PDO::pgsqlGetNotify, in milliseconds. Default: 0
5850
*/
59-
private $configuration = [];
60-
private $driverConnection;
51+
protected $configuration = [];
52+
protected $driverConnection;
53+
protected $queueEmptiedAt;
6154
private $schemaSynchronizer;
6255
private $autoSetup;
63-
private $usePgsqlNotify;
64-
private $listening = false;
65-
private $queueEmptiedAt;
6656

6757
public function __construct(array $configuration, DBALConnection $driverConnection, SchemaSynchronizer $schemaSynchronizer = null)
6858
{
69-
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
59+
$this->configuration = array_replace_recursive(static::DEFAULT_OPTIONS, $configuration);
7060
$this->driverConnection = $driverConnection;
7161
$this->schemaSynchronizer = $schemaSynchronizer ?? new SingleDatabaseSynchronizer($this->driverConnection);
7262
$this->autoSetup = $this->configuration['auto_setup'];
73-
$this->usePgsqlNotify = $this->configuration['pgsql_get_notify'] && method_exists($this->driverConnection->getWrappedConnection(), 'pgsqlGetNotify');
74-
}
75-
76-
public function __sleep()
77-
{
78-
throw new \BadMethodCallException('Cannot serialize '.__CLASS__);
79-
}
80-
81-
public function __wakeup()
82-
{
83-
throw new \BadMethodCallException('Cannot unserialize '.__CLASS__);
84-
}
85-
86-
public function __destruct()
87-
{
88-
$this->unlisten();
8963
}
9064

9165
public function reset()
9266
{
93-
$this->unlisten();
67+
$this->queueEmptiedAt = null;
9468
}
9569

9670
public function getConfiguration(): array
@@ -110,20 +84,20 @@ public static function buildConfiguration(string $dsn, array $options = []): arr
11084
}
11185

11286
$configuration = ['connection' => $components['host']];
113-
$configuration += $options + $query + self::DEFAULT_OPTIONS;
87+
$configuration += $options + $query + static::DEFAULT_OPTIONS;
11488

11589
$configuration['auto_setup'] = filter_var($configuration['auto_setup'], FILTER_VALIDATE_BOOLEAN);
11690

11791
// check for extra keys in options
118-
$optionsExtraKeys = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS));
92+
$optionsExtraKeys = array_diff(array_keys($options), array_keys(static::DEFAULT_OPTIONS));
11993
if (0 < \count($optionsExtraKeys)) {
120-
throw new InvalidArgumentException(sprintf('Unknown option found : [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', self::DEFAULT_OPTIONS)));
94+
throw new InvalidArgumentException(sprintf('Unknown option found : [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', static::DEFAULT_OPTIONS)));
12195
}
12296

12397
// check for extra keys in options
124-
$queryExtraKeys = array_diff(array_keys($query), array_keys(self::DEFAULT_OPTIONS));
98+
$queryExtraKeys = array_diff(array_keys($query), array_keys(static::DEFAULT_OPTIONS));
12599
if (0 < \count($queryExtraKeys)) {
126-
throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', self::DEFAULT_OPTIONS)));
100+
throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', static::DEFAULT_OPTIONS)));
127101
}
128102

129103
return $configuration;
@@ -170,25 +144,6 @@ public function send(string $body, array $headers, int $delay = 0): string
170144

171145
public function get(): ?array
172146
{
173-
if ($this->usePgsqlNotify && null !== $this->queueEmptiedAt) {
174-
if (!$this->listening) {
175-
// This is secure because the table name must be a valid identifier:
176-
// https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
177-
$this->driverConnection->exec(sprintf('LISTEN "%s"', $this->configuration['table_name']));
178-
$this->listening = true;
179-
}
180-
181-
$notification = $this->driverConnection->getWrappedConnection()->pgsqlGetNotify(\PDO::FETCH_ASSOC, $this->configuration['pgsql_get_notify_timeout']);
182-
if (
183-
// no notifications, or for another table or queue
184-
(false === $notification || $notification['message'] !== $this->configuration['table_name'] || $notification['payload'] !== $this->configuration['queue_name']) &&
185-
// delayed messages
186-
(microtime(true) * 1000 - $this->queueEmptiedAt < $this->configuration['pgsql_get_notify_check_delayed_interval'])
187-
) {
188-
return null;
189-
}
190-
}
191-
192147
get:
193148
$this->driverConnection->beginTransaction();
194149
try {
@@ -240,16 +195,14 @@ public function get(): ?array
240195

241196
throw $e;
242197
}
198+
} public function ack(string $id): bool
199+
{
200+
try {
201+
return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
202+
} catch (DBALException $exception) {
203+
throw new TransportException($exception->getMessage(), 0, $exception);
243204
}
244-
245-
public function ack(string $id): bool
246-
{
247-
try {
248-
return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
249-
} catch (DBALException $exception) {
250-
throw new TransportException($exception->getMessage(), 0, $exception);
251-
}
252-
}
205+
}
253206

254207
public function reject(string $id): bool
255208
{
@@ -282,29 +235,6 @@ public function setup(): void
282235
$this->driverConnection->getConfiguration()->setFilterSchemaAssetsExpression($assetFilter);
283236
}
284237

285-
if ($this->usePgsqlNotify) {
286-
$sql = sprintf(<<<'SQL'
287-
LOCK TABLE %1$s;
288-
-- create trigger function
289-
CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$
290-
BEGIN
291-
PERFORM pg_notify('%1$s', NEW.queue_name::text);
292-
RETURN NEW;
293-
END;
294-
$$ LANGUAGE plpgsql;
295-
296-
-- register trigger
297-
DROP TRIGGER IF EXISTS notify_trigger ON %1$s;
298-
299-
CREATE TRIGGER notify_trigger
300-
AFTER INSERT
301-
ON %1$s
302-
FOR EACH ROW EXECUTE PROCEDURE notify_%1$s();
303-
SQL
304-
, $this->configuration['table_name']);
305-
$this->driverConnection->exec($sql);
306-
}
307-
308238
$this->autoSetup = false;
309239
}
310240

@@ -421,16 +351,5 @@ private function decodeEnvelopeHeaders(array $doctrineEnvelope): array
421351

422352
return $doctrineEnvelope;
423353
}
424-
425-
private function unlisten()
426-
{
427-
if (!$this->listening) {
428-
return;
429-
}
430-
431-
$this->driverConnection->exec(sprintf('UNLISTEN "%s"', $this->configuration['table_name']));
432-
$this->listening = false;
433-
$this->queueEmptiedAt = null;
434-
}
435354
}
436355
class_alias(Connection::class, \Symfony\Component\Messenger\Transport\Doctrine\Connection::class);

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransportFactory.php

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,22 @@ public function __construct($registry)
3636

3737
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
3838
{
39-
unset($options['transport_name']);
40-
$configuration = Connection::buildConfiguration($dsn, $options);
39+
$useNotify = ($options['use_notify'] ?? true);
40+
unset($options['transport_name'], $options['use_notify']);
41+
// Always allow PostgreSQL-specific keys, to be able to transparently fallback to the native driver when LISTEN/NOTIFY isn't available
42+
$configuration = PostgreSqlConnection::buildConfiguration($dsn, $options);
4143

4244
try {
4345
$driverConnection = $this->registry->getConnection($configuration['connection']);
4446
} catch (\InvalidArgumentException $e) {
4547
throw new TransportException(sprintf('Could not find Doctrine connection from Messenger DSN "%s".', $dsn), 0, $e);
4648
}
4749

48-
$connection = new Connection($configuration, $driverConnection);
50+
if ($useNotify && method_exists($driverConnection->getWrappedConnection(), 'pgsqlGetNotify')) {
51+
$connection = new PostgreSqlConnection($configuration, $driverConnection);
52+
} else {
53+
$connection = new Connection($configuration, $driverConnection);
54+
}
4955

5056
return new DoctrineTransport($connection, $serializer);
5157
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
5+
6+
/**
7+
* Uses PostgreSQL LISTEN/NOTIFY to push messages to workers.
8+
*
9+
* @author Kévin Dunglas <dunglas@gmail.com>
10+
*/
11+
class PostgreSqlConnection extends Connection
12+
{
13+
/**
14+
* * use_notify: Set to false to disable the use of LISTEN/NOTIFY. Default: true
15+
* * check_delayed_interval: The interval to check for delayed messages, in milliseconds. Set to 0 to disable checks. Default: 1000
16+
* * get_notify_timeout: The length of time to wait for a response when calling PDO::pgsqlGetNotify, in milliseconds. Default: 0
17+
*/
18+
protected const DEFAULT_OPTIONS = parent::DEFAULT_OPTIONS + [
19+
'check_delayed_interval' => 1000,
20+
'get_notify_timeout' => 0,
21+
];
22+
23+
private $listening = false;
24+
25+
public function __sleep()
26+
{
27+
throw new \BadMethodCallException('Cannot serialize '.__CLASS__);
28+
}
29+
30+
public function __wakeup()
31+
{
32+
throw new \BadMethodCallException('Cannot unserialize '.__CLASS__);
33+
}
34+
35+
public function __destruct()
36+
{
37+
$this->unlisten();
38+
}
39+
40+
public function reset()
41+
{
42+
parent::reset();
43+
$this->unlisten();
44+
}
45+
46+
public function get(): ?array
47+
{
48+
if (null === $this->queueEmptiedAt) {
49+
return parent::get();
50+
}
51+
52+
if (!$this->listening) {
53+
// This is secure because the table name must be a valid identifier:
54+
// https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
55+
$this->driverConnection->exec(sprintf('LISTEN "%s"', $this->configuration['table_name']));
56+
$this->listening = true;
57+
}
58+
59+
$notification = $this->driverConnection->getWrappedConnection()->pgsqlGetNotify(\PDO::FETCH_ASSOC, $this->configuration['get_notify_timeout']);
60+
if (
61+
// no notifications, or for another table or queue
62+
(false === $notification || $notification['message'] !== $this->configuration['table_name'] || $notification['payload'] !== $this->configuration['queue_name']) &&
63+
// delayed messages
64+
(microtime(true) * 1000 - $this->queueEmptiedAt < $this->configuration['check_delayed_interval'])
65+
) {
66+
return null;
67+
}
68+
69+
return parent::get();
70+
}
71+
72+
public function setup(): void
73+
{
74+
parent::setup();
75+
76+
$sql = sprintf(<<<'SQL'
77+
LOCK TABLE %1$s;
78+
-- create trigger function
79+
CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$
80+
BEGIN
81+
PERFORM pg_notify('%1$s', NEW.queue_name::text);
82+
RETURN NEW;
83+
END;
84+
$$ LANGUAGE plpgsql;
85+
86+
-- register trigger
87+
DROP TRIGGER IF EXISTS notify_trigger ON %1$s;
88+
89+
CREATE TRIGGER notify_trigger
90+
AFTER INSERT
91+
ON %1$s
92+
FOR EACH ROW EXECUTE PROCEDURE notify_%1$s();
93+
SQL
94+
, $this->configuration['table_name']);
95+
$this->driverConnection->exec($sql);
96+
}
97+
98+
private function unlisten()
99+
{
100+
if (!$this->listening) {
101+
return;
102+
}
103+
104+
$this->driverConnection->exec(sprintf('UNLISTEN "%s"', $this->configuration['table_name']));
105+
$this->listening = false;
106+
}
107+
}

0 commit comments

Comments
 (0)
0