8000 [Cache] Add Redis Relay support by ostrolucky · Pull Request #48930 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Cache] Add Redis Relay support #48930

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[redis-messenger] Add Relay support
  • Loading branch information
ostrolucky committed Jan 24, 2023
commit 327969e6b190106022b2dc08ff6b994c4696bc27
5 changes: 5 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md
8000
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
CHANGELOG
=========

6.3
---

* Add support for Relay PHP extension for Redis

6.1
---

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
namespace Symfony\Component\Messenger\Bridge\Redis\Tests\Transport;

use PHPUnit\Framework\TestCase;
use Relay\Relay;
use Symfony\Component\Messenger\Bridge\Redis\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection;
use Symfony\Component\Messenger\Exception\TransportException;

/**
* @requires extension redis
*
* @group time-sensitive
* @group integration
*/
Expand Down Expand Up @@ -258,7 +260,7 @@ public function testLazyCluster()

public function testLazy()
{
$redis = new \Redis();
$redis = $this->createRedisClient();
$connection = Connection::fromDsn('redis://localhost/messenger-lazy?lazy=1', [], $redis);

$connection->add('1', []);
Expand All @@ -275,7 +277,7 @@ public function testLazy()

public function testDbIndex()
{
$redis = new \Redis();
$redis = $this->createRedisClient();

Connection::fromDsn('redis://localhost/queue?dbindex=2', [], $redis);

Expand All @@ -296,7 +298,7 @@ public function testFromDsnWithMultipleHosts()

public function testJsonError()
{
$redis = new \Redis();
$redis = $this->createRedisClient();
$connection = Connection::fromDsn('redis://localhost/json-error', [], $redis);
try {
$connection->add("\xB1\x31", []);
Expand All @@ -308,7 +310,7 @@ public function testJsonError()

public function testGetNonBlocking()
{
$redis = new \Redis();
$redis = $this->createRedisClient();

$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', ['sentinel_master' => null], $redis);

Expand All @@ -321,7 +323,7 @@ public function testGetNonBlocking()

public function testGetAfterReject()
{
$redis = new \Redis();
$redis = $this->createRedisClient();
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', ['sentinel_master' => null], $redis);

$connection->add('1', []);
Expand Down Expand Up @@ -380,4 +382,9 @@ private function skipIfRedisClusterUnavailable()
self::markTestSkipped($e->getMessage());
}
}

protected function createRedisClient(): \Redis|Relay
{
return new \Redis();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\Redis\Tests\Transport;

use Relay\Relay;

/**
* @requires extension relay
*
* @group time-sensitive
* @group integration
*/
class RelayExtIntegrationTest extends RedisExtIntegrationTest
{
protected function createRedisClient(): \Redis|Relay
{
return new Relay();
}
}
< 9E81 tr class="js-expandable-line js-skip-tagsearch" data-position="82">
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

namespace Symfony\Component\Messenger\Bridge\Redis\Transport;

use Relay\Relay;
use Relay\Sentinel;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\TransportException;
Expand Down Expand Up @@ -43,7 +45,7 @@ class Connection
'claim_interval' => 60000, // Interval by which pending/abandoned messages should be checked
'lazy' => false,
'auth' => null,
'serializer' => \Redis::SERIALIZER_PHP,
'serializer' => 1, // see \Redis::SERIALIZER_PHP,
'sentinel_master' => null, // String, master to look for (optional, default is NULL meaning Sentinel support is disabled)
'timeout' => 0.0, // Float, value in seconds (optional, default is 0 meaning unlimited)
'read_timeout' => 0.0, // Float, value in seconds (optional, default is 0 meaning unlimited)
Expand All @@ -52,7 +54,7 @@ class Connection
'ssl' => null, // see https://php.net/context.ssl
];

private \Redis|\RedisCluster|\Closure $redis;
private \Redis|Relay|\RedisCluster|\Closure $redis;
private string $stream;
private string $queue;
private string $group;
Expand All @@ -66,7 +68,7 @@ class Connection
private bool $deleteAfterReject;
private bool $couldHavePendingMessages = true;

public function __construct(array $options, \Redis|\RedisCluster $redis = null)
public function __construct(array $options, \Redis|Relay|\RedisCluster $redis = null)
{
if (version_compare(phpversion('redis'), '4.3.0', '<')) {
throw new LogicException('The redis transport requires php-redis 4.3.0 or higher.');
Expand All @@ -78,8 +80,8 @@ public function __construct(array $options, \Redis|\RedisCluster $redis = null)
$auth = $options['auth'];
$sentinelMaster = $options['sentinel_master'];

if (null !== $sentinelMaster && !class_exists(\RedisSentinel::class)) {
throw new InvalidArgumentException('Redis Sentinel support requires the "redis" extension v5.2 or higher.');
if (null !== $sentinelMaster && !class_exists(\RedisSentinel::class) && !class_exists(Sentinel::class)) {
throw new InvalidArgumentException('Redis Sentinel support requires ext-redis>=5.2, or ext-relay.');
}

if (null !== $sentinelMaster && ($redis instanceof \RedisCluster || \is_array($host))) {
Expand All @@ -91,7 +93,8 @@ public function __construct(array $options, \Redis|\RedisCluster $redis = null)
$this->redis = static fn () => self::initializeRedisCluster($redis, $hosts, $auth, $options);
} else {
if (null !== $sentinelMaster) {
$sentinelClient = new \RedisSentinel($host, $port, $options['timeout'], $options['persistent_id'], $options['retry_interval'], $options['read_timeout']);
$sentinelClass = \extension_loaded('redis') ? \RedisSentinel::class : Sentinel::class;
$sentinelClient = new $sentinelClass($host, $port, $options['timeout'], $options['persistent_id'], $options['retry_interval'], $options['read_timeout']);

if (!$address = $sentinelClient->getMasterAddrByName($sentinelMaster)) {
throw new InvalidArgumentException(sprintf('Failed to retrieve master information from master name "%s" and address "%s:%d".', $sentinelMaster, $host, $port));
Expand All @@ -100,7 +103,7 @@ public function __construct(array $options, \Redis|\RedisCluster $redis = null)
[$host, $port] = $address;
}

$this->redis = static fn () => self::initializeRedis($redis ?? new \Redis(), $host, $port, $auth, $options);
$this->redis = static fn () => self::initializeRedis($redis ?? (\extension_loaded('redis') ? new \Redis() : new Relay()), $host, $port, $auth, $options);
}

if (!$options['lazy']) {
Expand Down Expand Up @@ -128,12 +131,12 @@ public function __construct(array $options, \Redis|\RedisCluster $redis = null)
/**
* @param string|string[]|null $auth
*/
private static function initializeRedis(\Redis $redis, string $host, int $port, string|array|null $auth, array $params): \Redis
private static function initializeRedis(\Redis|Relay $redis, string $host, int $port, string|array|null $auth, array $params): \Redis|Relay
{
$connect = isset($params['persistent_id']) ? 'pconnect' : 'connect';
$redis->{$connect}($host, $port, $params['timeout'], $params['persistent_id'], $params['retry_interval'], $params['read_timeout'], ...\defined('Redis::SCAN_PREFIX') ? [['stream' => $params['ssl'] ?? null]] : []);
$redis->{$connect}($host, $port, $params['timeout'], $params['persistent_id'], $params['retry_interval'], $params['read_timeout'], ...(\defined('Redis::SCAN_PREFIX') || \extension_loaded('relay')) ? [['stream' => $params['ssl'] ?? null]] : []);

$redis->setOption(\Redis::OPT_SERIALIZER, $params['serializer']);
$redis->setOption($redis instanceof \Redis ? \Redis::OPT_SERIALIZER : Relay::OPT_SERIALIZER, $params['serializer']);

if (null !== $auth && !$redis->auth($auth)) {
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
Expand All @@ -157,7 +160,7 @@ private static function initializeRedisCluster(?\RedisCluster $redis, array $hos
return $redis;
}

public static function fromDsn(#[\SensitiveParameter] string $dsn, array $options = [], \Redis|\RedisCluster $redis = null): self
public static function fromDsn(#[\SensitiveParameter] string $dsn, array $options = [], \Redis|Relay|\RedisCluster $redis = null): self
{
if (!str_contains($dsn, ',')) {
$parsedUrl = self::parseDsn($dsn, $options);
Expand Down Expand Up @@ -265,7 +268,7 @@ private function claimOldPendingMessages()
// This could soon be optimized with https://github.com/antirez/redis/issues/5212 or
// https://github.com/antirez/redis/issues/6256
$pendingMessages = $this->getRedis()->xpending($this->stream, $this->group, '-', '+', 1);
} catch (\RedisException $e) {
} catch (\RedisException|\Relay\Exception $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

Expand Down Expand Up @@ -294,7 +297,7 @@ private function claimOldPendingMessages()
);

$this->couldHavePendingMessages = true;
} catch (\RedisException $e) {
} catch (\RedisException|\Relay\Exception $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}
Expand Down Expand Up @@ -352,7 +355,7 @@ public function get(): ?array
[$this->stream => $messageId],
1
);
} catch (\RedisException $e) {
} catch (\RedisException|\Relay\Exception $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

Expand Down Expand Up @@ -390,7 +393,7 @@ public function ack(string $id): void
if ($this->deleteAfterAck) {
$acknowledged = $redis->xdel($this->stream, [$id]);
}
} catch (\RedisException $e) {
} catch (\RedisException|\Relay\Exception $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

Expand All @@ -411,7 +414,7 @@ public function reject(string $id): void
if ($this->deleteAfterReject) {
$deleted = $redis->xdel($this->stream, [$id]) && $deleted;
}
} catch (\RedisException $e) {
} catch (\RedisException|\Relay\Exception $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

Expand Down Expand Up @@ -474,7 +477,7 @@ public function add(string $body, array $headers, int $delayInMs = 0): string

$id = $added;
}
} catch (\RedisException $e) {
} catch (\RedisException|\Relay\Exception $e) {
if ($error = $redis->getLastError() ?: null) {
$redis->clearLastError();
}
Expand All @@ -497,7 +500,7 @@ public function setup(): void

try {
$redis->xgroup('CREATE', $this->stream, $this->group, 0, true);
} catch (\RedisException $e) {
} catch (\RedisException|\Relay\Exception $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

Expand Down Expand Up @@ -600,7 +603,7 @@ private function rawCommand(string $command, ...$arguments): mixed
} else {
$result = $redis->rawCommand($command, $this->queue, ...$arguments);
}
} catch (\RedisException $e) {
} catch (\RedisException|\Relay\Exception $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

Expand All @@ -614,7 +617,7 @@ private function rawCommand(string $command, ...$arguments): mixed
return $result;
}

private function getRedis(): \Redis|\RedisCluster
private function getRedis(): \Redis|Relay|\RedisCluster
{
if ($this->redis instanceof \Closure) {
$this->redis = ($this->redis)();
Expand Down
0