27
27
/**
28
28
* @author Vincent Touzet <vincent.touzet@gmail.com>
29
29
* @author Kévin Dunglas <dunglas@gmail.com>
30
- *
31
- * @final
32
30
*/
33
31
class Connection implements ResetInterface
34
32
{
35
- private const DEFAULT_OPTIONS = [
33
+ protected const DEFAULT_OPTIONS = [
36
34
'table_name ' => 'messenger_messages ' ,
37
35
'queue_name ' => 'default ' ,
38
36
'redeliver_timeout ' => 3600 ,
39
37
'auto_setup ' => true ,
40
- 'pgsql_get_notify ' => true ,
41
- 'pgsql_get_notify_check_delayed_interval ' => 1000 ,
42
- 'pgsql_get_notify_timeout ' => 0 ,
43
38
];
44
39
45
40
/**
@@ -52,45 +47,24 @@ class Connection implements ResetInterface
52
47
* * queue_name: name of the queue
53
48
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default: 3600
54
49
* * auto_setup: Whether the table should be created automatically during send / get. Default: true
55
- * * pgsql_get_notify: Use the LISTEN/NOTIFY feature of PostgreSQL. Default: true
56
- * * pgsql_get_notify_check_delayed_interval: The interval to check for delayed messages, in milliseconds. Set to 0 to disable checks. Default: 1000
57
- * * pgsql_get_notify_timeout: The length of time to wait for a response when calling PDO::pgsqlGetNotify, in milliseconds. Default: 0
58
50
*/
59
- private $ configuration = [];
60
- private $ driverConnection ;
51
+ protected $ configuration = [];
52
+ protected $ driverConnection ;
53
+ protected $ queueEmptiedAt ;
61
54
private $ schemaSynchronizer ;
62
55
private $ autoSetup ;
63
- private $ usePgsqlNotify ;
64
- private $ listening = false ;
65
- private $ queueEmptiedAt ;
66
56
67
57
public function __construct (array $ configuration , DBALConnection $ driverConnection , SchemaSynchronizer $ schemaSynchronizer = null )
68
58
{
69
- $ this ->configuration = array_replace_recursive (self ::DEFAULT_OPTIONS , $ configuration );
59
+ $ this ->configuration = array_replace_recursive (static ::DEFAULT_OPTIONS , $ configuration );
70
60
$ this ->driverConnection = $ driverConnection ;
71
61
$ this ->schemaSynchronizer = $ schemaSynchronizer ?? new SingleDatabaseSynchronizer ($ this ->driverConnection );
72
62
$ this ->autoSetup = $ this ->configuration ['auto_setup ' ];
73
- $ this ->usePgsqlNotify = $ this ->configuration ['pgsql_get_notify ' ] && method_exists ($ this ->driverConnection ->getWrappedConnection (), 'pgsqlGetNotify ' );
74
- }
75
-
76
- public function __sleep ()
77
- {
78
- throw new \BadMethodCallException ('Cannot serialize ' .__CLASS__ );
79
- }
80
-
81
- public function __wakeup ()
82
- {
83
- throw new \BadMethodCallException ('Cannot unserialize ' .__CLASS__ );
84
- }
85
-
86
- public function __destruct ()
87
- {
88
- $ this ->unlisten ();
89
63
}
90
64
91
65
public function reset ()
92
66
{
93
- $ this ->unlisten () ;
67
+ $ this ->queueEmptiedAt = null ;
94
68
}
95
69
96
70
public function getConfiguration (): array
@@ -110,20 +84,20 @@ public static function buildConfiguration(string $dsn, array $options = []): arr
110
84
}
111
85
112
86
$ configuration = ['connection ' => $ components ['host ' ]];
113
- $ configuration += $ options + $ query + self ::DEFAULT_OPTIONS ;
87
+ $ configuration += $ options + $ query + static ::DEFAULT_OPTIONS ;
114
88
115
89
$ configuration ['auto_setup ' ] = filter_var ($ configuration ['auto_setup ' ], FILTER_VALIDATE_BOOLEAN );
116
90
117
91
// check for extra keys in options
118
- $ optionsExtraKeys = array_diff (array_keys ($ options ), array_keys (self ::DEFAULT_OPTIONS ));
92
+ $ optionsExtraKeys = array_diff (array_keys ($ options ), array_keys (static ::DEFAULT_OPTIONS ));
119
93
if (0 < \count ($ optionsExtraKeys )) {
120
- throw new InvalidArgumentException (sprintf ('Unknown option found : [%s]. Allowed options are [%s] ' , implode (', ' , $ optionsExtraKeys ), implode (', ' , self ::DEFAULT_OPTIONS )));
94
+ throw new InvalidArgumentException (sprintf ('Unknown option found : [%s]. Allowed options are [%s] ' , implode (', ' , $ optionsExtraKeys ), implode (', ' , static ::DEFAULT_OPTIONS )));
121
95
}
122
96
123
97
// check for extra keys in options
124
- $ queryExtraKeys = array_diff (array_keys ($ query ), array_keys (self ::DEFAULT_OPTIONS ));
98
+ $ queryExtraKeys = array_diff (array_keys ($ query ), array_keys (static ::DEFAULT_OPTIONS ));
125
99
if (0 < \count ($ queryExtraKeys )) {
126
- throw new InvalidArgumentException (sprintf ('Unknown option found in DSN: [%s]. Allowed options are [%s] ' , implode (', ' , $ queryExtraKeys ), implode (', ' , self ::DEFAULT_OPTIONS )));
100
+ throw new InvalidArgumentException (sprintf ('Unknown option found in DSN: [%s]. Allowed options are [%s] ' , implode (', ' , $ queryExtraKeys ), implode (', ' , static ::DEFAULT_OPTIONS )));
127
101
}
128
102
129
103
return $ configuration ;
@@ -170,25 +144,6 @@ public function send(string $body, array $headers, int $delay = 0): string
170
144
171
145
public function get (): ?array
172
146
{
173
- if ($ this ->usePgsqlNotify && null !== $ this ->queueEmptiedAt ) {
174
- if (!$ this ->listening ) {
175
- // This is secure because the table name must be a valid identifier:
176
- // https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
177
- $ this ->driverConnection ->exec (sprintf ('LISTEN "%s" ' , $ this ->configuration ['table_name ' ]));
178
- $ this ->listening = true ;
179
- }
180
-
181
- $ notification = $ this ->driverConnection ->getWrappedConnection ()->pgsqlGetNotify (\PDO ::FETCH_ASSOC , $ this ->configuration ['pgsql_get_notify_timeout ' ]);
182
- if (
183
- // no notifications, or for another table or queue
184
- (false === $ notification || $ notification ['message ' ] !== $ this ->configuration ['table_name ' ] || $ notification ['payload ' ] !== $ this ->configuration ['queue_name ' ]) &&
185
- // delayed messages
186
- (microtime (true ) * 1000 - $ this ->queueEmptiedAt < $ this ->configuration ['pgsql_get_notify_check_delayed_interval ' ])
187
- ) {
188
- return null ;
189
- }
190
- }
191
-
192
147
get:
193
148
$ this ->driverConnection ->beginTransaction ();
194
149
try {
@@ -240,16 +195,14 @@ public function get(): ?array
240
195
241
196
throw $ e ;
242
197
}
198
+ } public function ack (string $ id ): bool
199
+ {
200
+ try {
201
+ return $ this ->driverConnection ->delete ($ this ->configuration ['table_name ' ], ['id ' => $ id ]) > 0 ;
202
+ } catch (DBALException $ exception ) {
203
+ throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
243
204
}
244
-
245
- public function ack (string $ id ): bool
246
- {
247
- try {
248
- return $ this ->driverConnection ->delete ($ this ->configuration ['table_name ' ], ['id ' => $ id ]) > 0 ;
249
- } catch (DBALException $ exception ) {
250
- throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
251
- }
252
- }
205
+ }
253
206
254
207
public function reject (string $ id ): bool
255
208
{
@@ -282,29 +235,6 @@ public function setup(): void
282
235
$ this ->driverConnection ->getConfiguration ()->setFilterSchemaAssetsExpression ($ assetFilter );
283
236
}
284
237
285
- if ($ this ->usePgsqlNotify ) {
286
- $ sql = sprintf (<<<'SQL'
287
- LOCK TABLE %1$s;
288
- -- create trigger function
289
- CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$
290
- BEGIN
291
- PERFORM pg_notify('%1$s', NEW.queue_name::text);
292
- RETURN NEW;
293
- END;
294
- $$ LANGUAGE plpgsql;
295
-
296
- -- register trigger
297
- DROP TRIGGER IF EXISTS notify_trigger ON %1$s;
298
-
299
- CREATE TRIGGER notify_trigger
300
- AFTER INSERT
301
- ON %1$s
302
- FOR EACH ROW EXECUTE PROCEDURE notify_%1$s();
303
- SQL
304
- , $ this ->configuration ['table_name ' ]);
305
- $ this ->driverConnection ->exec ($ sql );
306
- }
307
-
308
238
$ this ->autoSetup = false ;
309
239
}
310
240
@@ -421,16 +351,5 @@ private function decodeEnvelopeHeaders(array $doctrineEnvelope): array
421
351
422
352
return $ doctrineEnvelope ;
423
353
}
424
-
425
- private function unlisten ()
426
- {
427
- if (!$ this ->listening ) {
428
- return ;
429
- }
430
-
431
- $ this ->driverConnection ->exec (sprintf ('UNLISTEN "%s" ' , $ this ->configuration ['table_name ' ]));
432
- $ this ->listening = false ;
433
- $ this ->queueEmptiedAt = null ;
434
- }
435
354
}
436
355
class_alias (Connection::class, \Symfony \Component \Messenger \Transport \Doctrine \Connection::class);
0 commit comments