8000 [Messenger] Add support for PostgreSQL LISTEN/NOTIFY · symfony/symfony@0171724 · GitHub
[go: up one dir, main page]

Skip to content

Commit 0171724

Browse files
committed
[Messenger] Add support for PostgreSQL LISTEN/NOTIFY
1 parent 07818f2 commit 0171724

File tree

2 files changed

+90
-15
lines changed

2 files changed

+90
-15
lines changed

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
5.1.0
5+
-----
6+
7+
* Add support for PostgreSQL `LISTEN`/`NOTIFY`
8+
49
5.0.0
510
-----
611

src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php

Lines changed: 85 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class Connection
3535
'queue_name' => 'default',
3636
'redeliver_timeout' => 3600,
3737
'auto_setup' => true,
38+
'pgsql_get_notify' => false,
39+
'pgsql_get_notify_timeout' => 0,
3840
];
3941

4042
/**
@@ -45,20 +47,35 @@ class Connection
4547
* * table_name: name of the table
4648
* * connection: name of the Doctrine's entity manager
4749
* * queue_name: name of the queue
48-
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default 3600
49-
* * auto_setup: Whether the table should be created automatically during send / get. Default : true
50+
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default: 3600
51+
* * auto_setup: Whether the table should be created automatically during send / get. Default: true
52+
* * pgsql_get_notify: Use the LISTEN/NOTIFY feature of PostgreSQL. Default: true
53+
* * pgsql_get_notify_timeout: The length of time to wait for a response when calling PDO::pgsqlGetNotify, in milliseconds. Default: 0
5054
*/
5155
private $configuration = [];
5256
private $driverConnection;
5357
private $schemaSynchronizer;
5458
private $autoSetup;
59+
private $usePgsqlNotify;
60+
private $listening = false;
5561

5662
public function __construct(array $configuration, DBALConnection $driverConnection, SchemaSynchronizer $schemaSynchronizer = null)
5763
{
5864
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
5965
$this->driverConnection = $driverConnection;
6066
$this->schemaSynchronizer = $schemaSynchronizer ?? new SingleDatabaseSynchronizer($this->driverConnection);
6167
$this->autoSetup = $this->configuration['auto_setup'];
68+
$this->usePgsqlNotify = $this->configuration['pgsql_get_notify'] && method_exists($this->driverConnection->getWrappedConnection(), 'pgsqlGetNotify');
69+
}
70+
71+
public function __destruct()
72+
{
73+
$this->unlisten();
74+
}
75+
76+
public function reset()
77+
{
78+
$this->unlisten();
6279
}
6380

6481
public function getConfiguration(): array
@@ -119,25 +136,68 @@ public function send(string $body, array $headers, int $delay = 0): string
119136
'available_at' => '?',
120137
]);
121138

122-
$this->executeQuery($queryBuilder->getSQL(), [
123-
$body,
124-
json_encode($headers),
125-
$this->configuration['queue_name'],
126-
$now,
127-
$availableAt,
128-
], [
129-
null,
130-
null,
131-
null,
132-
Type::DATETIME,
133-
Type::DATETIME,
134-
]);
139+
$send = function () use ($queryBuilder, $body, $headers, $now, $availableAt) {
140+
$this->executeQuery($queryBuilder->getSQL(), [
141+
$body,
142+
json_encode($headers),
143+
$this->configuration['queue_name'],
144+
$now,
145+
$availableAt,
146+
], [
147+
null,
148+
null,
149+
null,
150+
Type::DATETIME,
151+
Type::DATETIME,
152+
]);
153+
};
154+
155+
if ($this->usePgsqlNotify) {
156+
if (0 !== $delay) {
157+
throw new InvalidArgumentException('Delays are not supported when using PostgreSQL NOTIFY, set the "pgsql_get_notify" option to false to disable this feature. You can also create a dedicated queue for delayed message not using NOTIFY.');
158+
}
159+
160+
$sendAndNotify = function () use ($send) {
161+
$this->driverConnection->beginTransaction();
162+
$send();
163+
$sth = $this->driverConnection->prepare('SELECT pg_notify(?, ?)');
164+
$sth->execute([$this->configuration['table_name'], $this->configuration['queue_name']]);
165+
$this->driverConnection->commit();
166+
};
167+
168+
try {
169+
$sendAndNotify();
170+
} catch (TableNotFoundException $e) {
171+
$this->driverConnection->rollBack();
172+
173+
// create table
174+
if ($this->autoSetup) {
175+
$this->setup();
176+
}
177+
$sendAndNotify();
178+
}
179+
} else {
180+
$send();
181+
}
135182

136183
return $this->driverConnection->lastInsertId();
137184
}
138185

139186
public function get(): ?array
140187
{
188+
if ($this->usePgsqlNotify) {
189+
if (!$this->listening) {
190+
$this->driverConnection->exec('LISTEN "'.$this->configuration['table_name'].'"'); // double quotes MUST be used, so using quote() isn't possible, this shouldn't be an issue because Doctrine doesn't quote table names anywhere.
191+
$this->listening = true;
192+
// Don't call pgsqlGetNotify() during the first iteration in case messages are already waiting in the queue
193+
} else {
194+
$notification = $this->driverConnection->getWrappedConnection()->pgsqlGetNotify(\PDO::FETCH_ASSOC, $this->configuration['pgsql_get_notify_timeout']);
195+
if (false === $notification || $notification['message'] !== $this->configuration['table_name'] || $notification['payload'] !== $this->configuration['queue_name']) {
196+
return null;
197+
}
198+
}
199+
}
200+
141201
get:
142202
$this->driverConnection->beginTransaction();
143203
try {
@@ -343,4 +403,14 @@ private function decodeEnvelopeHeaders(array $doctrineEnvelope): array
343403

344404
return $doctrineEnvelope;
345405
}
406+
407+
private function unlisten()
408+
{
409+
if (!$this->listening) {
410+
return;
411+
}
412+
413+
$this->driverConnection->exec('UNLISTEN "'.$this->configuration['table_name'].'"'); // double quotes MUST be used, so using quote() isn't possible, this shouldn't be an issue because Doctrine doesn't quote table names anywhere.
414+
$this->listening = false;
415+
}
346416
}

0 commit comments

Comments
 (0)
0