8000 feature #28713 [Cache] added support for connecting to Redis clusters… · symfony/symfony@620094a · GitHub
[go: up one dir, main page]

Skip to content

Commit 620094a

Browse files
committed
feature #28713 [Cache] added support for connecting to Redis clusters via DSN (nicolas-grekas)
This PR was merged into the 4.2-dev branch. Discussion ---------- [Cache] added support for connecting to Redis clusters via DSN | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | - | License | MIT | Doc PR | - Replaces #28300 and #28175 This PR allows configuring a cluster of Redis servers using all available options of either the phpredis extension or the Predis package: - the `redis_cluster=0/1` boolean option configures whether the client should use the Redis cluster protocol; - several hosts can be provided using a syntax very 8000 similar to #28598, enabling consistent hashing distribution of keys; - `failover=error/distribute/slaves` can be set to direct reads at slave servers; - extra options are passed as is to the driver (e.g. `profile=2.8`) - Predis per-server settings are also possible, using e.g. `host[localhost][alias]=foo` in the query string, or `host[localhost]=alias%3Dfoo` (ie PHP query arrays or urlencoded key/value pairs) Commits ------- a42e877 [Cache] added support for connecting to Redis clusters via DSN
2 parents 493c13a + a42e877 commit 620094a

9 files changed

+191
-60
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@
9999
"doctrine/doctrine-bundle": "~1.4",
100100
"monolog/monolog": "~1.11",
101101
"ocramius/proxy-manager": "~0.4|~1.0|~2.0",
102-
"predis/predis": "~1.0",
102+
"predis/predis": "~1.1",
103103
"egulias/email-validator": "~1.2,>=1.2.8|~2.0",
104104
"symfony/phpunit-bridge": "~3.4|~4.0",
105105
"symfony/security-acl": "~2.8|~3.0",

src/Symfony/Component/Cache/Adapter/AbstractAdapter.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public static function createConnection($dsn, array $options = array())
136136
if (!\is_string($dsn)) {
137137
throw new InvalidArgumentException(sprintf('The %s() method expect argument #1 to be string, %s given.', __METHOD__, \gettype($dsn)));
138138
}
139-
if (0 === strpos($dsn, 'redis://')) {
139+
if (0 === strpos($dsn, 'redis:')) {
140140
return RedisAdapter::createConnection($dsn, $options);
141141
}
142142
if (0 === strpos($dsn, 'memcached:')) {

src/Symfony/Component/Cache/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ CHANGELOG
44
4.2.0
55
-----
66

7-
* added support for configuring multiple Memcached servers in one DSN
7+
* added support for connecting to Redis clusters via DSN
8+
* added support for configuring multiple Memcached servers via DSN
89
* added `MarshallerInterface` and `DefaultMarshaller` to allow changing the serializer and provide one that automatically uses igbinary when available
910
* added `CacheInterface`, which provides stampede protection via probabilistic early expiration and should become the preferred way to use a cache
1011
* added sub-second expiry accuracy for backends that support it

src/Symfony/Component/Cache/Tests/Adapter/PredisAdapterTest.php

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,13 @@ public function testCreateConnection()
3434

3535
$params = array(
3636
'scheme' => 'tcp',
37-
'host' => $redisHost,
38-
'path' => '',
39-
'dbindex' => '1',
37+
'host' => 'localhost',
4038
'port' => 6379,
41-
'class' => 'Predis\Client',
42-
'timeout' => 3,
4339
'persistent' => 0,
44-
'persistent_id' => null,
45-
'read_timeout' => 0,
46-
'retry_interval' => 0,
47-
'compression' => true,
48-
'tcp_keepalive' => 0,
49-
'lazy' => false,
40+
'timeout' => 3,
41+
'read_write_timeout' => 0,
42+
'tcp_nodelay' => true,
5043
'database' => '1',
51-
'password' => null,
5244
);
5345
$this->assertSame($params, $connection->getParameters()->toArray());
5446
}

src/Symfony/Component/Cache/Tests/Adapter/PredisRedisClusterAdapterTest.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,17 @@
1111

1212
namespace Symfony\Component\Cache\Tests\Adapter;
1313

14+
use Symfony\Component\Cache\Adapter\RedisAdapter;
15+
1416
class PredisRedisClusterAdapterTest extends AbstractRedisAdapterTest
1517
{
1618
public static function setupBeforeClass()
1719
{
1820
if (!$hosts = getenv('REDIS_CLUSTER_HOSTS')) {
1921
self::markTestSkipped('REDIS_CLUSTER_HOSTS env var is not defined.');
2022
}
21-
self::$redis = new \Predis\Client(explode(' ', $hosts), array('cluster' => 'redis'));
23+
24+
self::$redis = RedisAdapter::createConnection('redis:?host['.str_replace(' ', ']&host[', $hosts).']', array('class' => \Predis\Client::class, 'redis_cluster' => true));
2225
}
2326

2427
public static function tearDownAfterClass()

src/Symfony/Component/Cache/Tests/Adapter/RedisAdapterTest.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ public function createCachePool($defaultLifetime = 0)
3333

3434
public function testCreateConnection()
3535
{
36+
$redis = RedisAdapter::createConnection('redis:?host[h1]&host[h2]&host[/foo:]');
37+
$this->assertInstanceOf(\RedisArray::class, $redis);
38+
$this->assertSame(array('h1:6379', 'h2:6379', '/foo'), $redis->_hosts());
39+
@$redis = null; // some versions of phpredis connect on destruct, let's silence the warning
40+
3641
$redisHost = getenv('REDIS_HOST');
3742

3843
$redis = RedisAdapter::createConnection('redis://'.$redisHost);

src/Symfony/Component/Cache/Tests/Adapter/RedisClusterAdapterTest.php

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111

1212
namespace Symfony\Component\Cache\Tests\Adapter;
1313

14+
use Symfony\Component\Cache\Adapter\AbstractAdapter;
15+
use Symfony\Component\Cache\Adapter\RedisAdapter;
16+
use Symfony\Component\Cache\Traits\RedisClusterProxy;
17+
1418
class RedisClusterAdapterTest extends AbstractRedisAdapterTest
1519
{
1620
public static function setupBeforeClass()
@@ -22,6 +26,33 @@ public static function setupBeforeClass()
2226
self::markTestSkipped('REDIS_CLUSTER_HOSTS env var is not defined.');
2327
}
2428

25-
self::$redis = new \RedisCluster(null, explode(' ', $hosts));
29+
self::$redis = AbstractAdapter::createConnection('redis:?host['.str_replace(' ', ']&host[', $hosts).']', array('lazy' => true, 'redis_cluster' => true));
30+
}
31+
32+
public function createCachePool($defaultLifetime = 0)
33+
{
34+
$this->assertInstanceOf(RedisClusterProxy::class, self::$redis);
35+
$adapter = new RedisAdapter(self::$redis, str_replace('\\', '.', __CLASS__), $defaultLifetime);
36+
37+
return $adapter;
38+
}
39+
40+
/**
41+
* @dataProvider provideFailedCreateConnection
42+
* @expectedException \Symfony\Component\Cache\Exception\InvalidArgumentException
43+
* @expectedExceptionMessage Redis connection failed
44+
*/
45+
public function testFailedCreateConnection($dsn)
46+
{
47+
RedisAdapter::createConnection($dsn);
48+
}
49+
50+
public function provideFailedCreateConnection()
51+
{
52+
return array(
53+
array('redis://localhost:1234?redis_cluster=1'),
54+
array('redis://foo@localhost?redis_cluster=1'),
55+
array('redis://localhost/123?redis_cluster=1'),
56+
);
2657
}
2758
}

src/Symfony/Component/Cache/Traits/RedisTrait.php

Lines changed: 141 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
use Predis\Connection\Aggregate\ClusterInterface;
1515
use Predis\Connection\Aggregate\RedisCluster;
16-
use Predis\Connection\Factory;
1716
use Predis\Response\Status;
1817
use Symfony\Component\Cache\Exception\CacheException;
1918
use Symfony\Component\Cache\Exception\InvalidArgumentException;
@@ -37,7 +36,10 @@ trait RedisTrait
3736
'retry_interval' => 0,
3837
'compression' => true,
3938
'tcp_keepalive' => 0,
40-
'lazy' => false,
39+
'lazy' => null,
40+
'redis_cluster' => false,
41+
'dbindex' => 0,
42+
'failover' => 'none',
4143
);
4244
private $redis;
4345
private $marshaller;
@@ -53,7 +55,7 @@ private function init($redisClient, $namespace, $defaultLifetime, ?MarshallerInt
5355
throw new InvalidArgumentException(sprintf('RedisAdapter namespace contains "%s" but only characters in [-+_.A-Za-z0-9] are allowed.', $match[0]));
5456
}
5557
if (!$redisClient instanceof \Redis && !$redisClient instanceof \RedisArray && !$redisClient instanceof \RedisCluster && !$redisClient instanceof \Predis\Client && !$redisClient instanceof RedisProxy && !$redisClient instanceof RedisClusterProxy) {
56-
throw new InvalidArgumentException(sprintf('%s() expects parameter 1 to be Redis, RedisArray, RedisCluster or Predis\Client, %s given', __METHOD__, \is_object($redisClient) ? \get_class($redisClient) : \gettype($redisClient)));
58+
throw new InvalidArgumentException(sprintf('%s() expects parameter 1 to be Redis, RedisArray, RedisCluster or Predis\Client, %s given.', __METHOD__, \is_object($redisClient) ? \get_class($redisClient) : \gettype($redisClient)));
5759
}
5860
$this->redis = $redisClient;
5961
$this->marshaller = $marshaller ?? new DefaultMarshaller();
@@ -74,57 +76,87 @@ private function init($redisClient, $namespace, $defaultLifetime, ?MarshallerInt
7476
*
7577
* @throws InvalidArgumentException when the DSN is invalid
7678
*
77-
* @return \Redis|\Predis\Client According to the "class" option
79+
* @return \Redis|\RedisCluster|\Predis\Client According to the "class" option
7880
*/
7981
public static function createConnection($dsn, array $options = array())
8082
{
81-
if (0 !== strpos($dsn, 'redis://')) {
82-
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s does not start with "redis://"', $dsn));
83+
if (0 !== strpos($dsn, 'redis:')) {
84+
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s does not start with "redis:".', $dsn));
8385
}
84-
$params = preg_replace_callback('#^redis://(?:(?:[^:@]*+:)?([^@]*+)@)?#', function ($m) use (&$auth) {
85-
if (isset($m[1])) {
86-
$auth = $m[1];
86+
87+
if (!\extension_loaded('redis') && !class_exists(\Predis\Client::class)) {
88+
throw new CacheException(sprintf('Cannot find the "redis" extension nor the "predis/predis" package: %s', $dsn));
89+
}
90+
91+
$params = preg_replace_callback('#^redis:(//)?(?:(?:[^:@]*+:)?([^@]*+)@)?#', function ($m) use (&$auth) {
92+
if (isset($m[2])) {
93+
$auth = $m[2];
8794
}
8895

89-
return 'file://';
96+
return 'file:'.($m[1] ?? '');
9097
}, $dsn);
91-
if (false === $params = parse_url($params)) {
92-
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn));
93-
}
94-
if (!isset($params['host']) && !isset($params['path'])) {
98+
99+
if (false === $params = parse_url($dsn)) {
95100
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn));
96101
}
97-
if (isset($params['path']) && preg_match('#/(\d+)$#', $params['path'], $m)) {
98-
$params['dbindex'] = $m[1];
99-
$params['path'] = substr($params['path'], 0, -\strlen($m[0]));
100-
}
101-
if (isset($params['host'])) {
102-
$scheme = 'tcp';
103-
} else {
104-
$scheme = 'unix';
105-
}
106-
$params += array(
107-
'host' => isset($params['host']) ? $params['host'] : $params['path'],
108-
'port' => isset($params['host']) ? 6379 : null,
109-
'dbindex' => 0,
110-
);
102+
103+
$query = $hosts = array();
104+
111105
if (isset($params['query'])) {
112106
parse_str($params['query'], $query);
113-
$params += $query;
107+
108+
if (isset($query['host'])) {
109+
if (!\is_array($hosts = $query['host'])) {
110+
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn));
111+
}
112+
foreach ($hosts as $host => $parameters) {
113+
if (\is_string($parameters)) {
114+
parse_str($parameters, $parameters);
115+
}
116+
if (false === $i = strrpos($host, ':')) {
117+
$hosts[$host] = array('scheme' => 'tcp', 'host' => $host, 'port' => 6379) + $parameters;
118+
} elseif ($port = (int) substr($host, 1 + $i)) {
119+
$hosts[$host] = array('scheme' => 'tcp', 'host' => substr($host, 0, $i), 'port' => $port) + $parameters;
120+
} else {
121+
$hosts[$host] = array('scheme' => 'unix', 'path' => substr($host, 0, $i)) + $parameters;
122+
}
123+
}
124+
$hosts = array_values($hosts);
125+
}
126+
}
127+
128+
if (isset($params['host']) || isset($params['path'])) {
129+
if (!isset($params['dbindex']) && isset($params['path']) && preg_match('#/(\d+)$#', $params['path'], $m)) {
130+
$params['dbindex'] = $m[1];
131+
$params['path'] = substr($params['path'], 0, -\strlen($m[0]));
132+
}
133+
134+
if (isset($params['host'])) {
135+
array_unshift($hosts, array('scheme' => 'tcp', 'host' => $params['host'], 'port' => $params['port'] ?? 6379));
136+
} else {
137+
array_unshift($hosts, array('scheme' => 'unix', 'path' => $params['path']));
138+
}
114139
}
115-
$params += $options + self::$defaultConnectionOptions;
116-
if (null === $params['class'] && !\extension_loaded('redis') && !class_exists(\Predis\Client::class)) {
117-
throw new CacheException(sprintf('Cannot find the "redis" extension, and "predis/predis" is not installed: %s', $dsn));
140+
141+
if (!$hosts) {
142+
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn));
143+
}
144+
145+
$params += $query + $options + self::$defaultConnectionOptions;
146+
147+
if (null === $params['class'] && \extension_loaded('redis')) {
148+
$class = $params['redis_cluster'] ? \RedisCluster::class : (1 < \count($hosts) ? \RedisArray::class : \Redis::class);
149+
} else {
150+
$class = null === $params['class'] ? \Predis\Client::class : $params['class'];
118151
}
119-
$class = null === $params['class'] ? (\extension_loaded('redis') ? \Redis::class : \Predis\Client::class) : $params['class'];
120152

121153
if (is_a($class, \Redis::class, true)) {
122154
$connect = $params['persistent'] || $params['persistent_id'] ? 'pconnect' : 'connect';
123155
$redis = new $class();
124156

125-
$initializer = function ($redis) use ($connect, $params, $dsn, $auth) {
157+
$initializer = function ($redis) use ($connect, $params, $dsn, $auth, $hosts) {
126158
try {
127-
@$redis->{$connect}($params['host'], $params['port'], $params['timeout'], $params['persistent_id'], $params['retry_interval']);
159+
@$redis->{$connect}($hosts[0]['host'], $hosts[0]['port'], $params['timeout'], (string) $params['persistent_id'], $params['retry_interval']);
128160
} catch (\RedisException $e) {
129161
throw new InvalidArgumentException(sprintf('Redis connection failed (%s): %s', $e->getMessage(), $dsn));
130162
}
@@ -160,15 +192,82 @@ public static function createConnection($dsn, array $options = array())
160192
} else {
161193
$initializer($redis);
162194
}
195+
} elseif (is_a($class, \RedisArray::class, true)) {
196+
foreach ($hosts as $i => $host) {
197+
$hosts[$i] = 'tcp' === $host['scheme'] ? $host['host'].':'.$host['port'] : $host['path'];
198+
}
199+
$params['lazy_connect'] = $params['lazy'] ?? true;
200+
$params['connect_timeout'] = $params['timeout'];
201+
202+
try {
203+
$redis = new $class($hosts, $params);
204+
} catch (\RedisClusterException $e) {
205+
throw new InvalidArgumentException(sprintf('Redis connection failed (%s): %s', $e->getMessage(), $dsn));
206+
}
207+
208+
if (0 < $params['tcp_keepalive'] && \defined('Redis::OPT_TCP_KEEPALIVE')) {
209+
$redis->setOption(\Redis::OPT_TCP_KEEPALIVE, $params['tcp_keepalive']);
210+
}
211+
if ($params['compression'] && \defined('Redis::COMPRESSION_LZF')) {
212+
$redis->setOption(\Redis::OPT_COMPRESSION, \Redis::COMPRESSION_LZF);
213+
}
214+
} elseif (is_a($class, \RedisCluster::class, true)) {
215+
$initializer = function () use ($class, $params, $dsn, $hosts) {
216+
foreach ($hosts as $i => $host) {
217+
$hosts[$i] = 'tcp' === $host['scheme'] ? $host['host'].':'.$host['port'] : $host['path'];
218+
}
219+
220+
try {
221+
$redis = new $class(null, $hosts, $params['timeout'], $params['read_timeout'], (bool) $params['persistent']);
222+
} catch (\RedisClusterException $e) {
223+
throw new InvalidArgumentException(sprintf('Redis connection failed (%s): %s', $e->getMessage(), $dsn));
224+
}
225+
226+
if (0 < $params['tcp_keepalive'] && \defined('Redis::OPT_TCP_KEEPALIVE')) {
227+
$redis->setOption(\Redis::OPT_TCP_KEEPALIVE, $params['tcp_keepalive']);
228+
}
229+
if ($params['compression'] && \defined('Redis::COMPRESSION_LZF')) {
230+
$redis->setOption(\Redis::OPT_COMPRESSION, \Redis::COMPRESSION_LZF);
231+
}
232+
switch ($params['failover']) {
233+
case 'error': $redis->setOption(\RedisCluster::OPT_SLAVE_FAILOVER, \RedisCluster::FAILOVER_ERROR); break;
234+
case 'distribute': $redis->setOption(\RedisCluster::OPT_SLAVE_FAILOVER, \RedisCluster::FAILOVER_DISTRIBUTE); break;
235+
case 'slaves': $redis->setOption(\RedisCluster::OPT_SLAVE_FAILOVER, \RedisCluster::FAILOVER_DISTRIBUTE_SLAVES); break;
236+
}
237+
238+
return $redis;
239+
};
240+
241+
$redis = $params['lazy'] ? new RedisClusterProxy($initializer) : $initializer();
163242
} elseif (is_a($class, \Predis\Client::class, true)) {
164-
$params['scheme'] = $scheme;
165-
$params['database'] = $params['dbindex'] ?: null;
166-
$params['password'] = $auth;
167-
$redis = new $class((new Factory())->create($params));
243+
if ($params['redis_cluster']) {
244+
$params['cluster'] = 'redis';
245+
}
246+
$params += array('parameters' => array());
247+
$params['parameters'] += array(
248+
'persistent' => $params['persistent'],
249+
'timeout' => $params['timeout'],
250+
'read_write_timeout' => $params['read_timeout'],
251+
'tcp_nodelay' => true,
252+
);
253+
if ($params['dbindex']) {
254+
$params['parameters']['database'] = $params['dbindex'];
255+
}
256+
if (null !== $auth) {
257+
$params['parameters']['password'] = $auth;
258+
}
259+
if (1 === \count($hosts) && !$params['redis_cluster']) {
260+
$hosts = $hosts[0];
261+
} elseif (\in_array($params['failover'], array('slaves', 'distribute'), true) && !isset($params['replication'])) {
262+
$params['replication'] = true;
263+
$hosts[0] += array('alias' => 'master');
264+
}
265+
266+
$redis = new $class($hosts, array_diff_key($params, self::$defaultConnectionOptions));
168267
} elseif (class_exists($class, false)) {
169-
throw new InvalidArgumentException(sprintf('"%s" is not a subclass of "Redis" or "Predis\Client"', $class));
268+
throw new InvalidArgumentException(sprintf('"%s" is not a subclass of "Redis", "RedisArray", "RedisCluster" nor "Predis\Client".', $class));
170269
} else {
171-
throw new InvalidArgumentException(sprintf('Class "%s" does not exist', $class));
270+
throw new InvalidArgumentException(sprintf('Class "%s" does not exist.', $class));
172271
}
173272

174273
return $redis;
@@ -183,7 +282,6 @@ protected function doFetch(array $ids)
183282
return array();
184283
}
185284

186-
$i = -1;
187285
$result = array();
188286

189287
if ($this->redis instanceof \Predis\Client) {
@@ -244,6 +342,7 @@ protected function doClear($namespace)
244342
$h->connect($host[0], $host[1]);
245343
}
246344
}
345+
247346
foreach ($hosts as $host) {
248347
if (!isset($namespace[0])) {
249348
$cleared = $host->flushDb() && $cleared;

src/Symfony/Component/Cache/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
"cache/integration-tests": "dev-master",
3333
"doctrine/cache": "~1.6",
3434
"doctrine/dbal": "~2.5",
35-
"predis/predis": "~1.0",
35+
"predis/predis": "~1.1",
3636
"symfony/config": "~4.2",
3737
"symfony/dependency-injection": "~3.4",
3838
"symfony/var-dumper": "^4.1.1"

0 commit comments

Comments
 (0)
0