11
11
12
12
namespace Symfony \Component \Messenger \Bridge \Redis \Transport ;
13
13
14
+ use Relay \Relay ;
15
+ use Relay \Sentinel ;
14
16
use Symfony \Component \Messenger \Exception \InvalidArgumentException ;
15
17
use Symfony \Component \Messenger \Exception \LogicException ;
16
18
use Symfony \Component \Messenger \Exception \TransportException ;
@@ -43,7 +45,7 @@ class Connection
43
45
'claim_interval ' => 60000 , // Interval by which pending/abandoned messages should be checked
44
46
'lazy ' => false ,
45
47
'auth ' => null ,
46
- 'serializer ' => \Redis::SERIALIZER_PHP ,
48
+ 'serializer ' => 1 , // see \Redis::SERIALIZER_PHP,
47
49
'sentinel_master ' => null , // String, master to look for (optional, default is NULL meaning Sentinel support is disabled)
48
50
'timeout ' => 0.0 , // Float, value in seconds (optional, default is 0 meaning unlimited)
49
51
'read_timeout ' => 0.0 , // Float, value in seconds (optional, default is 0 meaning unlimited)
@@ -52,7 +54,7 @@ class Connection
52
54
'ssl ' => null , // see https://php.net/context.ssl
53
55
];
54
56
55
- private \Redis |\RedisCluster |\Closure $ redis ;
57
+ private \Redis |Relay | \RedisCluster |\Closure $ redis ;
56
58
private string $ stream ;
57
59
private string $ queue ;
58
60
private string $ group ;
@@ -66,7 +68,7 @@ class Connection
66
68
private bool $ deleteAfterReject ;
67
69
private bool $ couldHavePendingMessages = true ;
68
70
69
- public function __construct (array $ options , \Redis |\RedisCluster $ redis = null )
71
+ public function __construct (array $ options , \Redis |Relay | \RedisCluster $ redis = null )
70
72
{
71
73
if (version_compare (phpversion ('redis ' ), '4.3.0 ' , '< ' )) {
72
74
throw new LogicException ('The redis transport requires php-redis 4.3.0 or higher. ' );
@@ -78,8 +80,8 @@ public function __construct(array $options, \Redis|\RedisCluster $redis = null)
78
80
$ auth = $ options ['auth ' ];
79
81
$ sentinelMaster = $ options ['sentinel_master ' ];
80
82
81
- if (null !== $ sentinelMaster && !class_exists (\RedisSentinel::class)) {
82
- throw new InvalidArgumentException ('Redis Sentinel support requires the " redis" extension v5.2 or higher . ' );
83
+ if (null !== $ sentinelMaster && !class_exists (\RedisSentinel::class) && ! class_exists (Sentinel::class) ) {
84
+ throw new InvalidArgumentException ('Redis Sentinel support requires ext- redis>=5.2, or ext-relay . ' );
83
85
}
84
86
85
87
if (null !== $ sentinelMaster && ($ redis instanceof \RedisCluster || \is_array ($ host ))) {
@@ -91,7 +93,8 @@ public function __construct(array $options, \Redis|\RedisCluster $redis = null)
91
93
$ this ->redis = static fn () => self ::initializeRedisCluster ($ redis , $ hosts , $ auth , $ options );
92
94
} else {
93
95
if (null !== $ sentinelMaster ) {
94
- $ sentinelClient = new \RedisSentinel ($ host , $ port , $ options ['timeout ' ], $ options ['persistent_id ' ], $ options ['retry_interval ' ], $ options ['read_timeout ' ]);
96
+ $ sentinelClass = \extension_loaded ('redis ' ) ? \RedisSentinel::class : Sentinel::class;
97
+ $ sentinelClient = new $ sentinelClass ($ host , $ port , $ options ['timeout ' ], $ options ['persistent_id ' ], $ options ['retry_interval ' ], $ options ['read_timeout ' ]);
95
98
96
99
if (!$ address = $ sentinelClient ->getMasterAddrByName ($ sentinelMaster )) {
97
100
throw new InvalidArgumentException (sprintf ('Failed to retrieve master information from master name "%s" and address "%s:%d". ' , $ sentinelMaster , $ host , $ port ));
@@ -100,7 +103,7 @@ public function __construct(array $options, \Redis|\RedisCluster $redis = null)
100
103
[$ host , $ port ] = $ address ;
101
104
}
102
105
103
- $ this ->redis = static fn () => self ::initializeRedis ($ redis ?? new \Redis (), $ host , $ port , $ auth , $ options );
106
+ $ this ->redis = static fn () => self ::initializeRedis ($ redis ?? ( \extension_loaded ( ' redis ' ) ? new \Redis () : new Relay () ), $ host , $ port , $ auth , $ options );
104
107
}
105
108
106
109
if (!$ options ['lazy ' ]) {
@@ -128,12 +131,12 @@ public function __construct(array $options, \Redis|\RedisCluster $redis = null)
128
131
/**
129
132
* @param string|string[]|null $auth
130
133
*/
131
- private static function initializeRedis (\Redis $ redis , string $ host , int $ port , string |array |null $ auth , array $ params ): \Redis
134
+ private static function initializeRedis (\Redis | Relay $ redis , string $ host , int $ port , string |array |null $ auth , array $ params ): \Redis | Relay
132
135
{
133
136
$ connect = isset ($ params ['persistent_id ' ]) ? 'pconnect ' : 'connect ' ;
134
- $ redis ->{$ connect }($ host , $ port , $ params ['timeout ' ], $ params ['persistent_id ' ], $ params ['retry_interval ' ], $ params ['read_timeout ' ], ...\defined ('Redis::SCAN_PREFIX ' ) ? [['stream ' => $ params ['ssl ' ] ?? null ]] : []);
137
+ $ redis ->{$ connect }($ host , $ port , $ params ['timeout ' ], $ params ['persistent_id ' ], $ params ['retry_interval ' ], $ params ['read_timeout ' ], ...( \defined ('Redis::SCAN_PREFIX ' ) || \extension_loaded ( ' relay ' ) ) ? [['stream ' => $ params ['ssl ' ] ?? null ]] : []);
135
138
136
- $ redis ->setOption (\Redis::OPT_SERIALIZER , $ params ['serializer ' ]);
139
+ $ redis ->setOption ($ redis instanceof \Redis ? \Redis:: OPT_SERIALIZER : Relay ::OPT_SERIALIZER , $ params ['serializer ' ]);
137
140
138
141
if (null !== $ auth && !$ redis ->auth ($ auth )) {
139
142
throw new InvalidArgumentException ('Redis connection failed: ' .$ redis ->getLastError ());
@@ -157,7 +160,7 @@ private static function initializeRedisCluster(?\RedisCluster $redis, array $hos
157
160
return $ redis ;
158
161
}
159
162
160
- public static function fromDsn (#[\SensitiveParameter] string $ dsn , array $ options = [], \Redis |\RedisCluster $ redis = null ): self
163
+ public static function fromDsn (#[\SensitiveParameter] string $ dsn , array $ options = [], \Redis |Relay | \RedisCluster $ redis = null ): self
161
164
{
162
165
if (!str_contains ($ dsn , ', ' )) {
163
166
$ parsedUrl = self ::parseDsn ($ dsn , $ options );
@@ -265,7 +268,7 @@ private function claimOldPendingMessages()
265
268
// This could soon be optimized with https://github.com/antirez/redis/issues/5212 or
266
269
// https://github.com/antirez/redis/issues/6256
267
270
$ pendingMessages = $ this ->getRedis ()->xpending ($ this ->stream , $ this ->group , '- ' , '+ ' , 1 );
268
- } catch (\RedisException $ e ) {
271
+ } catch (\RedisException | \ Relay \ Exception $ e ) {
269
272
throw new TransportException ($ e ->getMessage (), 0 , $ e );
270
273
}
271
274
@@ -294,7 +297,7 @@ private function claimOldPendingMessages()
294
297
);
295
298
296
299
$ this ->couldHavePendingMessages = true ;
297
- } catch (\RedisException $ e ) {
300
+ } catch (\RedisException | \ Relay \ Exception $ e ) {
298
301
throw new TransportException ($ e ->getMessage (), 0 , $ e );
299
302
}
300
303
}
@@ -352,7 +355,7 @@ public function get(): ?array
352
355
[$ this ->stream => $ messageId ],
353
356
1
354
357
);
355
- } catch (\RedisException $ e ) {
358
+ } catch (\RedisException | \ Relay \ Exception $ e ) {
356
359
throw new TransportException ($ e ->getMessage (), 0 , $ e );
357
360
}
358
361
@@ -390,7 +393,7 @@ public function ack(string $id): void
390
393
if ($ this ->deleteAfterAck ) {
391
394
$ acknowledged = $ redis ->xdel ($ this ->stream , [$ id ]);
392
395
}
393
- } catch (\RedisException $ e ) {
396
+ } catch (\RedisException | \ Relay \ Exception $ e ) {
394
397
throw new TransportException ($ e ->getMessage (), 0 , $ e );
395
398
}
396
399
@@ -411,7 +414,7 @@ public function reject(string $id): void
411
414
if ($ this ->deleteAfterReject ) {
412
415
$ deleted = $ redis ->xdel ($ this ->stream , [$ id ]) && $ deleted ;
413
416
}
414
- } catch (\RedisException $ e ) {
417
+ } catch (\RedisException | \ Relay \ Exception $ e ) {
415
418
throw new TransportException ($ e ->getMessage (), 0 , $ e );
416
419
}
417
420
@@ -474,7 +477,7 @@ public function add(string $body, array $headers, int $delayInMs = 0): string
474
477
475
478
$ id = $ added ;
476
479
}
477
- } catch (\RedisException $ e ) {
480
+ } catch (\RedisException | \ Relay \ Exception $ e ) {
478
481
if ($ error = $ redis ->getLastError () ?: null ) {
479
482
$ redis ->clearLastError ();
480
483
}
@@ -497,7 +500,7 @@ public function setup(): void
497
500
498
501
try {
499
502
$ redis ->xgroup ('CREATE ' , $ this ->stream , $ this ->group , 0 , true );
500
- } catch (\RedisException $ e ) {
503
+ } catch (\RedisException | \ Relay \ Exception $ e ) {
501
504
throw new TransportException ($ e ->getMessage (), 0 , $ e );
502
505
}
503
506
@@ -600,7 +603,7 @@ private function rawCommand(string $command, ...$arguments): mixed
600
603
} else {
601
604
$ result = $ redis ->rawCommand ($ command , $ this ->queue , ...$ arguments );
602
605
}
603
- } catch (\RedisException $ e ) {
606
+ } catch (\RedisException | \ Relay \ Exception $ e ) {
604
607
throw new TransportException ($ e<
75EB
/span>->getMessage (), 0 , $ e );
605
608
}
606
609
@@ -614,7 +617,7 @@ private function rawCommand(string $command, ...$arguments): mixed
614
617
return $ result ;
615
618
}
616
619
617
- private function getRedis (): \Redis |\RedisCluster
620
+ private function getRedis (): \Redis |Relay | \RedisCluster
618
621
{
619
622
if ($ this ->redis instanceof \Closure) {
620
623
$ this ->redis = ($ this ->redis )();
0 commit comments