8000 [Messenger] Add support for multiple Redis Sentinel hosts · symfony/symfony@3e13640 · GitHub
[go: up one dir, main page]

Skip to content

Commit 3e13640

Browse files
committed
[Messenger] Add support for multiple Redis Sentinel hosts
1 parent 5bfce3f commit 3e13640

File tree

5 files changed

+92
-16
lines changed

5 files changed

+92
-16
lines changed

.github/workflows/integration-tests.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ jobs:
172172
env:
173173
REDIS_HOST: 'localhost:16379'
174174
REDIS_CLUSTER_HOSTS: 'localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
175-
REDIS_SENTINEL_HOSTS: 'localhost:26379 localhost:26379 localhost:26379'
175+
REDIS_SENTINEL_HOSTS: 'unreachable-host:26379 localhost:26379 localhost:26379'
176176
REDIS_SENTINEL_SERVICE: redis_sentinel
177177
MESSENGER_REDIS_DSN: redis://127.0.0.1:7006/messages
178178
MESSENGER_AMQP_DSN: amqp://localhost/%2f/messages

src/Symfony/Component/Cache/Traits/RedisTrait.php

+3-3
Original file line numberDiff line numberDiff line change
@@ -225,18 +225,18 @@ public static function createConnection(#[\SensitiveParameter] string $dsn, arra
225225
if (\defined('Redis::OPT_NULL_MULTIBULK_AS_NULL') && isset($params['auth'])) {
226226
$extra = [$params['auth']];
227227
}
228-
$sentinel = new $sentinelClass($host, $port, $params['timeout'], (string) $params['persistent_id'], $params['retry_interval'], $params['read_timeout'], ...$extra);
229228

230229
try {
230+
$sentinel = new $sentinelClass($host, $port, $params['timeout'], (string) $params['persistent_id'], $params['retry_interval'], $params['read_timeout'], ...$extra);
231231
if ($address = $sentinel->getMasterAddrByName($params['redis_sentinel'])) {
232232
[$host, $port] = $address;
233233
}
234-
} catch (\RedisException $e) {
234+
} catch (\RedisException|\Relay\Exception $redisException) {
235235
}
236236
} while (++$hostIndex < \count($hosts) && !$address);
237237

238238
if (isset($params['redis_sentinel']) && !$address) {
239-
throw new InvalidArgumentException(sprintf('Failed to retrieve master information from sentinel "%s".', $params['redis_sentinel']));
239+
throw new InvalidArgumentException(sprintf('Failed to retrieve master information from sentinel "%s".', $params['redis_sentinel']), previous: $redisException ?? null);
240240
}
241241

242242
try {

src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php

+30
6D40
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\Redis\Tests\Transport;
1313

14+
use PHPUnit\Framework\SkippedTestSuiteError;
1415
use PHPUnit\Framework\TestCase;
1516
use Relay\Relay;
1617
use Symfony\Component\Messenger\Bridge\Redis\Tests\Fixtures\DummyMessage;
@@ -220,6 +221,35 @@ public function testConnectionClaimAndRedeliver()
220221
$connection->ack($message['id']);
221222
}
222223

224+
public function testSentinel()
225+
{
226+
if (!$hosts = getenv('REDIS_SENTINEL_HOSTS')) {
227+
throw new SkippedTestSuiteError('REDIS_SENTINEL_HOSTS env var is not defined.');
228+
}
229+
230+
if (!getenv('MESSENGER_REDIS_SENTINEL_MASTER')) {
231+
throw new SkippedTestSuiteError('MESSENGER_REDIS_SENTINEL_MASTER env var is not defined.');
232+
}
233+
234+
$dsn = 'redis:?host['.str_replace(' ', ']&host[', $hosts).']';
235+
236+
$connection = Connection::fromDsn($dsn,
237+
['delete_after_ack' => true,
238+
'sentinel_master' => getenv('MESSENGER_REDIS_SENTINEL_MASTER') ?: null,
239+
], $this->redis);
240+
241+
$connection->add('1', []);
242+
$this->assertNotEmpty($message = $connection->get());
243+
$this->assertSame([
244+
'message' => json_encode([
245+
'body' => '1',
246+
'headers' => [],
247+
]),
248+
], $message['data']);
249+
$connection->reject($message['id']);
250+
$connection->cleanup();
251+
}
252+
223253
public function testLazySentinel()
224254
{
225255
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'),

src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

+57-12
Original file line numberDiff line numberDiff line change
@@ -84,26 +84,45 @@ public function __construct(array $options, \Redis|Relay|\RedisCluster $redis =
8484
throw new InvalidArgumentException('Redis Sentinel support requires ext-redis>=5.2, or ext-relay.');
8585
}
8686

87-
if (null !== $sentinelMaster && ($redis instanceof \RedisCluster || \is_array($host))) {
87+
if (null !== $sentinelMaster && $redis instanceof \RedisCluster) {
8888
throw new InvalidArgumentException('Cannot configure Redis Sentinel and Redis Cluster instance at the same time.');
8989
}
9090

91-
if (\is_array($host) || $redis instanceof \RedisCluster) {
91+
if ((\is_array($host) && null === $sentinelMaster) || $redis instanceof \RedisCluster) {
9292
$hosts = \is_string($host) ? [$host.':'.$port] : $host; // Always ensure we have an array
9393
$this->redis = static fn () => self::initializeRedisCluster($redis, $hosts, $auth, $options);
9494
} else {
95-
if (null !== $sentinelMaster) {
96-
$sentinelClass = \extension_loaded('redis') ? \RedisSentinel::class : Sentinel::class;
97-
$sentinelClient = new $sentinelClass($host, $port, $options['timeout'], $options['persistent_id'], $options['retry_interval'], $options['read_timeout']);
98-
99-
if (!$address = $sentinelClient->getMasterAddrByName($sentinelMaster)) {
100-
throw new InvalidArgumentException(sprintf('Failed to retrieve master information from master name "%s" and address "%s:%d".', $sentinelMaster, $host, $port));
95+
$this->redis = static function () use ($redis, $sentinelMaster, $host, $port, $options, $auth) {
96+
if (null !== $sentinelMaster) {
97+
$sentinelClass = \extension_loaded('redis') ? \RedisSentinel::class : Sentinel::class;
98+
$hostIndex = 0;
99+
$hosts = \is_array($host) ? $host : [['scheme' => 'tcp', 'host' => $host, 'port' => $port]];
100+
do {
101+
$host = $hosts[$hostIndex]['host'];
102+
$port = $hosts[$hostIndex]['port'] ?? 0;
103+
$tls = 'tls' === $hosts[$hostIndex]['scheme'];
104+
$address = false;
105+
106+
if (isset($hosts[$hostIndex]['host']) && $tls) {
107+
$host = 'tls://'.$host;
108+
}
109+
110+
try {
111+
$sentinel = new $sentinelClass($host, $port, $options['timeout'], $options['persistent_id'], $options['retry_interval'], $options['read_timeout']);
112+
if ($address = $sentinel->getMasterAddrByName($sentinelMaster)) {
113+
[$host, $port] = $address;
114+
}
115+
} catch (\RedisException|\Relay\Exception $redisException) {
116+
}
117+
} while (++$hostIndex < \count($hosts) && !$address);
118+
119+
if (!$address) {
120+
throw new InvalidArgumentException(sprintf('Failed to retrieve master information from sentinel "%s".', $sentinelMaster), previous: $redisException ?? null);
121+
}
101122
}
102123

103-
[$host, $port] = $address;
104-
}
105-
106-
$this->redis = static fn () => self::initializeRedis($redis ?? (\extension_loaded('redis') ? new \Redis() : new Relay()), $host, $port, $auth, $options);
124+
return self::initializeRedis($redis ?? (\extension_loaded('redis') ? new \Redis() : new Relay()), $host, $port, $auth, $options);
125+
};
107126
}
108127

109128
if (!$options['lazy']) {
@@ -207,6 +226,32 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option
207226
$user = '' !== ($parsedUrl['user'] ?? '') ? urldecode($parsedUrl['user']) : null;
208227
$options['auth'] ??= null !== $pass && null !== $user ? [$user, $pass] : ($pass ?? $user);
209228

229+
if (isset($parsedUrl['query'])) {
230+
parse_str($parsedUrl['query'], $query);
231+
232+
if (isset($query['host'])) {
233+
$tls = 'rediss' === $parsedUrl['scheme'];
234+
$tcpScheme = $tls ? 'tls' : 'tcp';
235+
236+
if (!\is_array($hosts = $query['host'])) {
237+
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: "%s".', $dsn));
238+
}
239+
foreach ($hosts as $host => $parameters) {
240+
if (\is_string($parameters)) {
241+
parse_str($parameters, $parameters);
242+
}
243+
if (false === $i = strrpos($host, ':')) {
244+
$hosts[$host] = ['scheme' => $tcpScheme, 'host' => $host, 'port' => 6379] + $parameters;
245+
} elseif ($port = (int) substr($host, 1 + $i)) {
246+
$hosts[$host] = ['scheme' => $tcpScheme, 'host' => substr($host, 0, $i), 'port' => $port] + $parameters;
247+
} else {
248+
$hosts[$host] = ['scheme' => 'unix', 'host' => substr($host, 0, $i)] + $parameters;
249+
}
250+
}
251+
$parsedUrl['host'] = array_values($hosts);
252+
}
253+
}
254+
210255
if (isset($parsedUrl['host'])) {
211256
$options['host'] = $parsedUrl['host'] ?? $options['host'];
212257
$options['port'] = $parsedUrl['port'] ?? $options['port'];

src/Symfony/Component/Messenger/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ CHANGELOG
66

77
* Deprecate `StopWorkerOnSignalsListener` in favor of using the `SignalableCommandInterface`
88
* Add `HandlerDescriptor::getOptions`
9+
* Add support for multiple Redis Sentinel hosts
910

1011
6.3
1112
---

0 commit comments

Comments
 (0)
0