8000 [redis-messenger] Add Relay support · symfony/symfony@327969e · GitHub
[go: up one dir, main page]

Skip to content

Commit 327969e

Browse files
committed
[redis-messenger] Add Relay support
1 parent 1d7b409 commit 327969e

File tree

4 files changed

+68
-25
lines changed

4 files changed

+68
-25
lines changed

src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
6.3
5+
---
6+
7+
* Add support for Relay PHP extension for Redis
8+
49
6.1
510
---
611

src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
namespace Symfony\Component\Messenger\Bridge\Redis\Tests\Transport;
1313

1414
use PHPUnit\Framework\TestCase;
15+
use Relay\Relay;
1516
use Symfony\Component\Messenger\Bridge\Redis\Tests\Fixtures\DummyMessage;
1617
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection;
1718
use Symfony\Component\Messenger\Exception\TransportException;
1819

1920
/**
2021
* @requires extension redis
22+
*
2123
* @group time-sensitive
2224
* @group integration
2325
*/
@@ -258,7 +260,7 @@ public function testLazyCluster()
258260

259261
public function testLazy()
260262
{
261-
$redis = new \Redis();
263+
$redis = $this->createRedisClient();
262264
$connection = Connection::fromDsn('redis://localhost/messenger-lazy?lazy=1', [], $redis);
263265

264266
$connection->add('1', []);
@@ -275,7 +277,7 @@ public function testLazy()
275277

276278
public function testDbIndex()
277279
{
278-
$redis = new \Redis();
280+
$redis = $this->createRedisClient();
279281

280282
Connection::fromDsn('redis://localhost/queue?dbindex=2', [], $redis);
281283

@@ -296,7 +298,7 @@ public function testFromDsnWithMultipleHosts()
296298

297299
public function testJsonError()
298300
{
299-
$redis = new \Redis();
301+
$redis = $this->createRedisClient();
300302
$connection = Connection::fromDsn('redis://localhost/json-error', [], $redis);
301303
try {
302304
$connection->add("\xB1\x31", []);
@@ -308,7 +310,7 @@ public function testJsonError()
308310

309311
public function testGetNonBlocking()
310312
{
311-
$redis = new \Redis();
313+
$redis = $this->createRedisClient();
312314

313315
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', ['sentinel_master' => null], $redis);
314316

@@ -321,7 +323,7 @@ public function testGetNonBlocking()
321323

322324
public function testGetAfterReject()
323325
{
324-
$redis = new \Redis();
326+
$redis = $this->createRedisClient();
325327
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', ['sentinel_master' => null], $redis);
326328

327329
$connection->add('1', []);
@@ -380,4 +382,9 @@ private function skipIfRedisClusterUnavailable()
380382
self::markTestSkipped($e->getMessage());
381383
}
382384
}
385+
386+
protected function createRedisClient(): \Redis|Relay
387+
{
388+
return new \Redis();
389+
}
383390
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Redis\Tests\Transport;
13+
14+
use Relay\Relay;
15+
16+
/**
17+
* @requires extension relay
18+
*
19+
* @group time-sensitive
20+
* @group integration
21+
*/
22+
class RelayExtIntegrationTest extends RedisExtIntegrationTest
23+
{
24+
protected function createRedisClient(): \Redis|Relay
25+
{
26+
return new Relay();
27+
}
28+
}

src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
1313

14+
use Relay\Relay;
15+
use Relay\Sentinel;
1416
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
1517
use Symfony\Component\Messenger\Exception\LogicException;
1618
use Symfony\Component\Messenger\Exception\TransportException;
@@ -43,7 +45,7 @@ class Connection
4345
'claim_interval' => 60000, // Interval by which pending/abandoned messages should be checked
4446
'lazy' => false,
4547
'auth' => null,
46-
'serializer' => \Redis::SERIALIZER_PHP,
48+
'serializer' => 1, // see \Redis::SERIALIZER_PHP,
4749
'sentinel_master' => null, // String, master to look for (optional, default is NULL meaning Sentinel support is disabled)
4850
'timeout' => 0.0, // Float, value in seconds (optional, default is 0 meaning unlimited)
4951
'read_timeout' => 0.0, // Float, value in seconds (optional, default is 0 meaning unlimited)
@@ -52,7 +54,7 @@ class Connection
5254
'ssl' => null, // see https://php.net/context.ssl
5355
];
5456

55-
private \Redis|\RedisCluster|\Closure $redis;
57+
private \Redis|Relay|\RedisCluster|\Closure $redis;
5658
private string $stream;
5759
private string $queue;
5860
private string $group;
@@ -66,7 +68,7 @@ class Connection
6668
private bool $deleteAfterReject;
6769
private bool $couldHavePendingMessages = true;
6870

69-
public function __construct(array $options, \Redis|\RedisCluster $redis = null)
71+
public function __construct(array $options, \Redis|Relay|\RedisCluster $redis = null)
7072
{
7173
if (version_compare(phpversion('redis'), '4.3.0', '<')) {
7274
throw new LogicException('The redis transport requires php-redis 4.3.0 or higher.');
@@ -78,8 +80,8 @@ public function __construct(array $options, \Redis|\RedisCluster $redis = null)
7880
$auth = $options['auth'];
7981
$sentinelMaster = $options['sentinel_master'];
8082

81-
if (null !== $sentinelMaster && !class_exists(\RedisSentinel::class)) {
82-
throw new InvalidArgumentException('Redis Sentinel support requires the "redis" extension v5.2 or higher.');
83+
if (null !== $sentinelMaster && !class_exists(\RedisSentinel::class) && !class_exists(Sentinel::class)) {
84+
throw new InvalidArgumentException('Redis Sentinel support requires ext-redis>=5.2, or ext-relay.');
8385
}
8486

8587
if (null !== $sentinelMaster && ($redis instanceof \RedisCluster || \is_array($host))) {
@@ -91,7 +93,8 @@ public function __construct(array $options, \Redis|\RedisCluster $redis = null)
9193
$this->redis = static fn () => self::initializeRedisCluster($redis, $hosts, $auth, $options);
9294
} else {
9395
if (null !== $sentinelMaster) {
94-
$sentinelClient = new \RedisSentinel($host, $port, $options['timeout'], $options['persistent_id'], $options['retry_interval'], $options['read_timeout']);
96+
$sentinelClass = \extension_loaded('redis') ? \RedisSentinel::class : Sentinel::class;
97+
$sentinelClient = new $sentinelClass($host, $port, $options['timeout'], $options['persistent_id'], $options['retry_interval'], $options['read_timeout']);
9598

9699
if (!$address = $sentinelClient->getMasterAddrByName($sentinelMaster)) {
97100
throw new InvalidArgumentException(sprintf('Failed to retrieve master information from master name "%s" and address "%s:%d".', $sentinelMaster, $host, $port));
@@ -100,7 +103,7 @@ public function __construct(array $options, \Redis|\RedisCluster $redis = null)
100103
[$host, $port] = $address;
101104
}
102105

103-
$this->redis = static fn () => self::initializeRedis($redis ?? new \Redis(), $host, $port, $auth, $options);
106+
$this->redis = static fn () => self::initializeRedis($redis ?? (\extension_loaded('redis') ? new \Redis() : new Relay()), $host, $port, $auth, $options);
104107
}
105108

106109
if (!$options['lazy']) {
@@ -128,12 +131,12 @@ public function __construct(array $options, \Redis|\RedisCluster $redis = null)
128131
/**
129132
* @param string|string[]|null $auth
130133
*/
131-
private static function initializeRedis(\Redis $redis, string $host, int $port, string|array|null $auth, array $params): \Redis
134+
private static function initializeRedis(\Redis|Relay $redis, string $host, int $port, string|array|null $auth, array $params): \Redis|Relay
132135
{
133136
$connect = isset($params['persistent_id']) ? 'pconnect' : 'connect';
134-
$redis->{$connect}($host, $port, $params['timeout'], $params['persistent_id'], $params['retry_interval'], $params['read_timeout'], ...\defined('Redis::SCAN_PREFIX') ? [['stream' => $params['ssl'] ?? null]] : []);
137+
$redis->{$connect}($host, $port, $params['timeout'], $params['persistent_id'], $params['retry_interval'], $params['read_timeout'], ...(\defined('Redis::SCAN_PREFIX') || \extension_loaded('relay')) ? [['stream' => $params['ssl'] ?? null]] : []);
135138

136-
$redis->setOption(\Redis::OPT_SERIALIZER, $params['serializer']);
139+
$redis->setOption($redis instanceof \Redis ? \Redis::OPT_SERIALIZER : Relay::OPT_SERIALIZER, $params['serializer']);
137140

138141
if (null !== $auth && !$redis->auth($auth)) {
139142
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
@@ -157,7 +160,7 @@ private static function initializeRedisCluster(?\RedisCluster $redis, array $hos
157160
return $redis;
158161
}
159162

160-
public static function fromDsn(#[\SensitiveParameter] string $dsn, array $options = [], \Redis|\RedisCluster $redis = null): self
163+
public static function fromDsn(#[\SensitiveParameter] string $dsn, array $options = [], \Redis|Relay|\RedisCluster $redis = null): self
161164
{
162165
if (!str_contains($dsn, ',')) {
163166
$parsedUrl = self::parseDsn($dsn, $options);
@@ -265,7 +268,7 @@ private function claimOldPendingMessages()
265268
// This could soon be optimized with https://github.com/antirez/redis/issues/5212 or
266269
// https://github.com/antirez/redis/issues/6256
267270
$pendingMessages = $this->getRedis()->xpending($this->stream, $this->group, '-', '+', 1);
268-
} catch (\RedisException $e) {
271+
} catch (\RedisException|\Relay\Exception $e) {
269272
throw new TransportException($e->getMessage(), 0, $e);
270273
}
271274

@@ -294,7 +297,7 @@ private function claimOldPendingMessages()
294297
);
295298

296299
$this->couldHavePendingMessages = true;
297-
} catch (\RedisException $e) {
300+
} catch (\RedisException|\Relay\Exception $e) {
298301
throw new TransportException($e->getMessage(), 0, $e);
299302
}
300303
}
@@ -352,7 +355,7 @@ public function get(): ?array
352355
[$this->stream => $messageId],
353356
1
354357
);
355-
} catch (\RedisException $e) {
358+
} catch (\RedisException|\Relay\Exception $e) {
356359
throw new TransportException($e->getMessage(), 0, $e);
357360
}
358361

@@ -390,7 +393,7 @@ public function ack(string $id): void
390393
if ($this->deleteAfterAck) {
391394
$acknowledged = $redis->xdel($this->stream, [$id]);
392395
}
393-
} catch (\RedisException $e) {
396+
} catch (\RedisException|\Relay\Exception $e) {
394397
throw new TransportException($e->getMessage(), 0, $e);
395398
}
396399

@@ -411,7 +414,7 @@ public function reject(string $id): void
411414
if ($this->deleteAfterReject) {
412415
$deleted = $redis->xdel($this->stream, [$id]) && $deleted;
413416
}
414-
} catch (\RedisException $e) {
417+
} catch (\RedisException|\Relay\Exception $e) {
415418
throw new TransportException($e->getMessage(), 0, $e);
416419
}
417420

@@ -474,7 +477,7 @@ public function add(string $body, array $headers, int $delayInMs = 0): string
474477

475478
$id = $added;
476479
}
477-
} catch (\RedisException $e) {
480+
} catch (\RedisException|\Relay\Exception $e) {
478481
if ($error = $redis->getLastError() ?: null) {
479482
$redis->clearLastError();
480483
}
@@ -497,7 +500,7 @@ public function setup(): void
497500

498501
try {
499502
$redis->xgroup('CREATE', $this->stream, $this->group, 0, true);
500-
} catch (\RedisException $e) {
503+
} catch (\RedisException|\Relay\Exception $e) {
501504
throw new TransportException($e->getMessage(), 0, $e);
502505
}
503506

@@ -600,7 +603,7 @@ private function rawCommand(string $command, ...$arguments): mixed
600603
} else {
601604
$result = $redis->rawCommand($command, $this->queue, ...$arguments);
602605
}
603-
} catch (\RedisException $e) {
606+
} catch (\RedisException|\Relay\Exception $e) {
604607
throw new TransportException($e< 75EB /span>->getMessage(), 0, $e);
605608
}
606609

@@ -614,7 +617,7 @@ private function rawCommand(string $command, ...$arguments): mixed
614617
return $result;
615618
}
616619

617-
private function getRedis(): \Redis|\RedisCluster
620+
private function getRedis(): \Redis|Relay|\RedisCluster
618621
{
619622
if ($this->redis instanceof \Closure) {
620623
$this->redis = ($this->redis)();

0 commit comments

Comments
 (0)
0