You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
+85-15Lines changed: 85 additions & 15 deletions
Original file line number
Diff line number
Diff line change
@@ -35,6 +35,8 @@ class Connection
35
35
'queue_name' => 'default',
36
36
'redeliver_timeout' => 3600,
37
37
'auto_setup' => true,
38
+
'pgsql_get_notify' => false,
39
+
'pgsql_get_notify_timeout' => 0,
38
40
];
39
41
40
42
/**
@@ -45,20 +47,35 @@ class Connection
45
47
* * table_name: name of the table
46
48
* * connection: name of the Doctrine's entity manager
47
49
* * queue_name: name of the queue
48
-
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default 3600
49
-
* * auto_setup: Whether the table should be created automatically during send / get. Default : true
50
+
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default: 3600
51
+
* * auto_setup: Whether the table should be created automatically during send / get. Default: true
52
+
* * pgsql_get_notify: Use the LISTEN/NOTIFY feature of PostgreSQL. Default: true
53
+
* * pgsql_get_notify_timeout: The length of time to wait for a response when calling PDO::pgsqlGetNotify, in milliseconds. Default: 0
@@ -119,25 +136,68 @@ public function send(string $body, array $headers, int $delay = 0): string
119
136
'available_at' => '?',
120
137
]);
121
138
122
-
$this->executeQuery($queryBuilder->getSQL(), [
123
-
$body,
124
-
json_encode($headers),
125
-
$this->configuration['queue_name'],
126
-
$now,
127
-
$availableAt,
128
-
], [
129
-
null,
130
-
null,
131
-
null,
132
-
Type::DATETIME,
133
-
Type::DATETIME,
134
-
]);
139
+
$send = function () use ($queryBuilder, $body, $headers, $now, $availableAt) {
140
+
$this->executeQuery($queryBuilder->getSQL(), [
141
+
$body,
142
+
json_encode($headers),
143
+
$this->configuration['queue_name'],
144
+
$now,
145
+
$availableAt,
146
+
], [
147
+
null,
148
+
null,
149
+
null,
150
+
Type::DATETIME,
151
+
Type::DATETIME,
152
+
]);
153
+
};
154
+
155
+
if ($this->usePgsqlNotify) {
156
+
if (0 !== $delay) {
157
+
thrownewInvalidArgumentException('Delays are not supported when using PostgreSQL NOTIFY, set the "pgsql_get_notify" option to false to disable this feature. You can also create a dedicated queue for delayed message not using NOTIFY.');
$this->driverConnection->exec('LISTEN "'.$this->configuration['table_name'].'"'); // double quotes MUST be used, so using quote() isn't possible, this shouldn't be an issue because Doctrine doesn't quote table names anywhere.
191
+
$this->listening = true;
192
+
// Don't call pgsqlGetNotify() during the first iteration in case messages are already waiting in the queue
@@ -343,4 +403,14 @@ private function decodeEnvelopeHeaders(array $doctrineEnvelope): array
343
403
344
404
return$doctrineEnvelope;
345
405
}
406
+
407
+
privatefunctionunlisten()
408
+
{
409
+
if (!$this->listening) {
410
+
return;
411
+
}
412
+
413
+
$this->driverConnection->exec('UNLISTEN "'.$this->configuration['table_name'].'"'); // double quotes MUST be used, so using quote() isn't possible, this shouldn't be an issue because Doctrine doesn't quote table names anywhere.
0 commit comments