8000 [Cache] added support for connecting to Redis clusters via DSN by nicolas-grekas · Pull Request #28713 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Cache] added support for connecting to Redis clusters via DSN #28713

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
"doctrine/doctrine-bundle": "~1.4",
"monolog/monolog": "~1.11",
"ocramius/proxy-manager": "~0.4|~1.0|~2.0",
"predis/predis": "~1.0",
"predis/predis": "~1.1",
"egulias/email-validator": "~1.2,>=1.2.8|~2.0",
"symfony/phpunit-bridge": "~3.4|~4.0",
"symfony/security-acl": "~2.8|~3.0",
Expand Down
2 changes: 1 addition & 1 deletion src/Symfony/Component/Cache/Adapter/AbstractAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public static function createConnection($dsn, array $options = array())
if (!\is_string($dsn)) {
throw new InvalidArgumentException(sprintf('The %s() method expect argument #1 to be string, %s given.', __METHOD__, \gettype($dsn)));
}
if (0 === strpos($dsn, 'redis://')) {
if (0 === strpos($dsn, 'redis:')) {
return RedisAdapter::createConnection($dsn, $options);
}
if (0 === strpos($dsn, 'memcached:')) {
Expand Down
3 changes: 2 additions & 1 deletion src/Symfony/Component/Cache/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ CHANGELOG
4.2.0
-----

* added support for configuring multiple Memcached servers in one DSN
* added support for connecting to Redis clusters via DSN
* added support for configuring multiple Memcached servers via DSN
* added `MarshallerInterface` and `DefaultMarshaller` to allow changing the serializer and provide one that automatically uses igbinary when available
* added `CacheInterface`, which provides stampede protection via probabilistic early expiration and should become the preferred way to use a cache
* added sub-second expiry accuracy for backends that support it
Expand Down
16 changes: 4 additions & 12 deletions src/Symfony/Component/Cache/Tests/Adapter/PredisAdapterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,13 @@ public function testCreateConnection()

$params = array(
'scheme' => 'tcp',
'host' => $redisHost,
'path' => '',
'dbindex' => '1',
'host' => 'localhost',
'port' => 6379,
'class' => 'Predis\Client',
'timeout' => 3,
'persistent' => 0,
'persistent_id' => null,
'read_timeout' => 0,
'retry_interval' => 0,
'compression' => true,
'tcp_keepalive' => 0,
'lazy' => false,
'timeout' => 3,
'read_write_timeout' => 0,
'tcp_nodelay' => true,
'database' => '1',
'password' => null,
);
$this->assertSame($params, $connection->getParameters()->toArray());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@

namespace Symfony\Component\Cache\Tests\Adapter;

use Symfony\Component\Cache\Adapter\RedisAdapter;

class PredisRedisClusterAdapterTest extends AbstractRedisAdapterTest
{
public static function setupBeforeClass()
{
if (!$hosts = getenv('REDIS_CLUSTER_HOSTS')) {
self::markTestSkipped('REDIS_CLUSTER_HOSTS env var is not defined.');
}
self::$redis = new \Predis\Client(explode(' ', $hosts), array('cluster' => 'redis'));

self::$redis = RedisAdapter::createConnection('redis:?host['.str_replace(' ', ']&host[', $hosts).']', array('class' => \Predis\Client::class, 'redis_cluster' => true));
}

public static function tearDownAfterClass()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public function createCachePool($defaultLifetime = 0)

public function testCreateConnection()
{
$redis = RedisAdapter::createConnection('redis:?host[h1]&host[h2]&host[/foo:]');
$this->assertInstanceOf(\RedisArray::class, $redis);
$this->assertSame(array('h1:6379', 'h2:6379', '/foo'), $redis->_hosts());
@$redis = null; // some versions of phpredis connect on destruct, let's silence the warning

$redisHost = getenv('REDIS_HOST');

$redis = RedisAdapter::createConnection('redis://'.$redisHost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

namespace Symfony\Component\Cache\Tests\Adapter;

use Symfony\Component\Cache\Adapter\AbstractAdapter;
use Symfony\Component\Cache\Adapter\RedisAdapter;
use Symfony\Component\Cache\Traits\RedisClusterProxy;

class RedisClusterAdapterTest extends AbstractRedisAdapterTest
{
public static function setup 6D4E BeforeClass()
Expand All @@ -22,6 +26,33 @@ public static function setupBeforeClass()
self::markTestSkipped('REDIS_CLUSTER_HOSTS env var is not defined.');
}

self::$redis = new \RedisCluster(null, explode(' ', $hosts));
self::$redis = AbstractAdapter::createConnection('redis:?host['.str_replace(' ', ']&host[', $hosts).']', array('lazy' => true, 'redis_cluster' => true));
}

public function createCachePool($defaultLifetime = 0)
{
$this->assertInstanceOf(RedisClusterProxy::class, self::$redis);
$adapter = new RedisAdapter(self::$redis, str_replace('\\', '.', __CLASS__), $defaultLifetime);

return $adapter;
}

/**
* @dataProvider provideFailedCreateConnection
* @expectedException \Symfony\Component\Cache\Exception\InvalidArgumentException
* @expectedExceptionMessage Redis connection failed
*/
public function testFailedCreateConnection($dsn)
{
RedisAdapter::createConnection($dsn);
}

public function provideFailedCreateConnection()
{
return array(
array('redis://localhost:1234?redis_cluster=1'),
array('redis://foo@localhost?redis_cluster=1'),
array('redis://localhost/123?redis_cluster=1'),
);
}
}
183 changes: 141 additions & 42 deletions src/Symfony/Component/Cache/Traits/RedisTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

use Predis\Connection\Aggregate\ClusterInterface;
use Predis\Connection\Aggregate\RedisCluster;
use Predis\Connection\Factory;
use Predis\Response\Status;
use Symfony\Component\Cache\Exception\CacheException;
use Symfony\Component\Cache\Exception\InvalidArgumentException;
Expand All @@ -37,7 +36,10 @@ trait RedisTrait
'retry_interval' => 0,
'compression' => true,
'tcp_keepalive' => 0,
'lazy' => false,
'lazy' => null,
'redis_cluster' => false,
'dbindex' => 0,
'failover' => 'none',
);
private $redis;
private $marshaller;
Expand All @@ -53,7 +55,7 @@ private function init($redisClient, $namespace, $defaultLifetime, ?MarshallerInt
throw new InvalidArgumentException(sprintf('RedisAdapter namespace contains "%s" but only characters in [-+_.A-Za-z0-9] are allowed.', $match[0]));
}
if (!$redisClient instanceof \Redis && !$redisClient instanceof \RedisArray && !$redisClient instanceof \RedisCluster && !$redisClient instanceof \Predis\Client && !$redisClient instanceof RedisProxy && !$redisClient instanceof RedisClusterProxy) {
throw new InvalidArgumentException(sprintf('%s() expects parameter 1 to be Redis, RedisArray, RedisCluster or Predis\Client, %s given', __METHOD__, \is_object($redisClient) ? \get_class($redisClient) : \gettype($redisClient)));
throw new InvalidArgumentException(sprintf('%s() expects parameter 1 to be Redis, RedisArray, RedisCluster or Predis\Client, %s given.', __METHOD__, \is_object($redisClient) ? \get_class($redisClient) : \gettype($redisClient)));
}
$this->redis = $redisClient;
$this->marshaller = $marshaller ?? new DefaultMarshaller();
Expand All @@ -74,57 +76,87 @@ private function init($redisClient, $namespace, $defaultLifetime, ?MarshallerInt
*
* @throws InvalidArgumentException when the DSN is invalid
*
* @return \Redis|\Predis\Client According to the "class" option
* @return \Redis|\RedisCluster|\Predis\Client According to the "class" option
*/
public static function createConnection($dsn, array $options = array())
{
if (0 !== strpos($dsn, 'redis://')) {
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s does not start with "redis://"', $dsn));
if (0 !== strpos($dsn, 'redis:')) {
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s does not start with "redis:".', $dsn));
}
$params = preg_replace_callback('#^redis://(?:(?:[^:@]*+:)?([^@]*+)@)?#', function ($m) use (&$auth) {
if (isset($m[1])) {
$auth = $m[1];

if (!\extension_loaded('redis') && !class_exists(\Predis\Client::class)) {
throw new CacheException(sprintf('Cannot find the "redis" extension nor the "predis/predis" package: %s', $dsn));
}

$params = preg_replace_callback('#^redis:(//)?(?:(?:[^:@]*+:)?([^@]*+)@)?#', function ($m) use (&$auth) {
if (isset($m[2])) {
$auth = $m[2];
}

return 'file://';
return 'file:'.($m[1] ?? '');
}, $dsn);
if (false === $params = parse_url($params)) {
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn));
}
if (!isset($params['host']) && !isset($params['path'])) {

if (false === $params = parse_url($dsn)) {
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn));
}
if (isset($params['path']) && preg_match('#/(\d+)$#', $params['path'], $m)) {
$params['dbindex'] = $m[1];
$params['path'] = substr($params['path'], 0, -\strlen($m[0]));
}
if (isset($params['host'])) {
$scheme = 'tcp';
} else {
$scheme = 'unix';
}
$params += array(
'host' => isset($params['host']) ? $params['host'] : $params['path'],
'port' => isset($params['host']) ? 6379 : null,
'dbindex' => 0,
);

$query = $hosts = array();

if (isset($params['query'])) {
parse_str($params['query'], $query);
$params += $query;

if (isset($query['host'])) {
if (!\is_array($hosts = $query['host'])) {
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn));
}
foreach ($hosts as $host => $parameters) {
if (\is_string($parameters)) {
parse_str($parameters, $parameters);
}
if (false === $i = strrpos($host, ':')) {
$hosts[$host] = array('scheme' => 'tcp', 'host' => $host, 'port' => 6379) + $parameters;
} elseif ($port = (int) substr($host, 1 + $i)) {
$hosts[$host] = array('scheme' => 'tcp', 'host' => substr($host, 0, $i), 'port' => $port) + $parameters;
} else {
$hosts[$host] = array('scheme' => 'unix', 'path' => substr($host, 0, $i)) + $parameters;
}
}
$hosts = array_values($hosts);
}
}

if (isset($params['host']) || isset($params['path'])) {
if (!isset($params['dbindex']) && isset($params['path']) && preg_match('#/(\d+)$#', $params['path'], $m)) {
$params['dbindex'] = $m[1];
$params['path'] = substr($params['path'], 0, -\strlen($m[0]));
}

if (isset($params['host'])) {
array_unshift($hosts, array('scheme' => 'tcp', 'host' => $params['host'], 'port' => $params['port'] ?? 6379));
} else {
array_unshift($hosts, array('scheme' => 'unix', 'path' => $params['path']));
}
}
$params += $options + self::$defaultConnectionOptions;
if (null === $params['class'] && !\extension_loaded('redis') && !class_exists(\Predis\Client::class)) {
throw new CacheException(sprintf('Cannot find the "redis" extension, and "predis/predis" is not installed: %s', $dsn));

if (!$hosts) {
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn));
}

$params += $query + $options + self::$defaultConnectionOptions;

if (null === $params['class'] && \extension_loaded('redis')) {
$class = $params['redis_cluster'] ? \RedisCluster::class : (1 < \count($hosts) ? \RedisArray::class : \Redis::class);
} else {
$class = null === $params['class'] ? \Predis\Client::class : $params['class'];
}
$class = null === $params['class'] ? (\extension_loaded('redis') ? \Redis::class : \Predis\Client::class) : $params['class'];

if (is_a($class, \Redis::class, true)) {
$connect = $params['persistent'] || $params['persistent_id'] ? 'pconnect' : 'connect';
$redis = new $class();

$initializer = function ($redis) use ($connect, $params, $dsn, $auth) {
$initializer = function ($redis) use ($connect, $params, $dsn, $auth, $hosts) {
try {
@$redis->{$connect}($params['host'], $params['port'], $params['timeout'], $params['persistent_id'], $params['retry_interval']);
@$redis->{$connect}($hosts[0]['host'], $hosts[0]['port'], $params['timeout'], (string) $params['persistent_id'], $params['retry_interval']);
} catch (\RedisException $e) {
throw new InvalidArgumentException(sprintf('Redis connection failed (%s): %s', $e->getMessage(), $dsn));
}
Expand Down Expand Up @@ -160,15 +192,82 @@ public static function createConnection($dsn, array $options = array())
} else {
$initializer($redis);
}
} elseif (is_a($class, \RedisArray::class, true)) {
foreach ($hosts as $i => $host) {
$hosts[$i] = 'tcp' === $host['scheme'] ? $host['host'].':'.$host['port'] : $host['path'];
}
$params['lazy_connect'] = $params['lazy'] ?? true;
$params['connect_timeout'] = $params['timeout'];

try {
$redis = new $class($hosts, $params);
} catch (\RedisClusterException $e) {
throw new InvalidArgumentException(sprintf('Redis connection failed (%s): %s', $e->getMessage(), $dsn));
}

if (0 < $params['tcp_keepalive'] && \defined('Redis::OPT_TCP_KEEPALIVE')) {
$redis->setOption(\Redis::OPT_TCP_KEEPALIVE, $params['tcp_keepalive']);
}
if ($params['compression'] && \defined('Redis::COMPRESSION_LZF')) {
$redis->setOption(\Redis::OPT_COMPRESSION, \Redis::COMPRESSION_LZF);
}
} elseif (is_a($class, \RedisCluster::class, true)) {
$initializer = function () use ($class, $params, $dsn, $hosts) {
foreach ($hosts as $i => $host) {
$hosts[$i] = 'tcp' === $host['scheme'] ? $host['host'].':'.$host['port'] : $host['path'];
}

try {
$redis = new $class(null, $hosts, $params['timeout'], $params['read_timeout'], (bool) $params['persistent']);
} catch (\RedisClusterException $e) {
throw new InvalidArgumentException(sprintf('Redis connection failed (%s): %s', $e->getMessage(), $dsn));
}

if (0 < $params['tcp_keepalive'] && \defined('Redis::OPT_TCP_KEEPALIVE')) {
$redis->setOption(\Redis::OPT_TCP_KEEPALIVE, $params['tcp_keepalive']);
}
if ($params['compression'] && \defined('Redis::COMPRESSION_LZF')) {
$redis->setOption(\Redis::OPT_COMPRESSION, \Redis::COMPRESSION_LZF);
}
switch ($params['failover']) {
case 'error': $redis->setOption(\RedisCluster::OPT_SLAVE_FAILOVER, \RedisCluster::FAILOVER_ERROR); break;
case 'distribute': $redis->setOption(\RedisCluster::OPT_SLAVE_FAILOVER, \RedisCluster::FAILOVER_DISTRIBUTE); break;
case 'slaves': $redis->setOption(\RedisCluster::OPT_SLAVE_FAILOVER, \RedisCluster::FAILOVER_DISTRIBUTE_SLAVES); break;
}

return $redis;
};

$redis = $params['lazy'] ? new RedisClusterProxy($initializer) : $initializer();
} elseif (is_a($class, \Predis\Client::class, true)) {
$params['scheme'] = $scheme;
$params['database'] = $params['dbindex'] ?: null;
$params['password'] = $auth;
$redis = new $class((new Factory())->create($params));
if ($params['redis_cluster']) {
$params['cluster'] = 'redis';
}
$params += array('parameters' => array());
$params['parameters'] += array(
'persistent' => $params['persistent'],
'timeout' => $params['timeout'],
'read_write_timeout' => $params['read_timeout'],
'tcp_nodelay' => true,
);
if ($params['dbindex']) {
$params['parameters']['database'] = $params['dbindex'];
}
if (null !== $auth) {
$params['parameters']['password'] = $auth;
}
if (1 === \count($hosts) && !$params['redis_cluster']) {
$hosts = $hosts[0];
} elseif (\in_array($params['failover'], array('slaves', 'distribute'), true) && !isset($params['replication'])) {
$params['replication'] = true;
$hosts[0] += array('alias' => 'master');
}

$redis = new $class($hosts, array_diff_key($params, self::$defaultConnectionOptions));
} elseif (class_exists($class, false)) {
throw new InvalidArgumentException(sprintf('"%s" is not a subclass of "Redis" or "Predis\Client"', $class));
throw new InvalidArgumentException(sprintf('"%s" is not a subclass of "Redis", "RedisArray", "RedisCluster" nor "Predis\Client".', $class));
} else {
throw new InvalidArgumentException(sprintf('Class "%s" does not exist', $class));
throw new InvalidArgumentException(sprintf('Class "%s" does not exist.', $class));
}

return $redis;
Expand All @@ -183,7 +282,6 @@ protected function doFetch(array $ids)
return array();
}

$i = -1;
$result = array();

if ($this->redis instanceof \Predis\Client) {
Expand Down Expand Up @@ -244,6 +342,7 @@ protected function doClear($namespace)
$h->connect($host[0], $host[1]);
}
}

foreach ($hosts as $host) {
if (!isset($namespace[0])) {
$cleared = $host->flushDb() && $cleared;
Expand Down
2 changes: 1 addition & 1 deletion src/Symfony/Component/Cache/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"cache/integration-tests": "dev-master",
"doctrine/cache": "~1.6",
"doctrine/dbal": "~2.5",
"predis/predis": "~1.0",
"predis/predis": "~1.1",
"symfony/config": "~4.2",
"symfony/dependency-injection": "~3.4",
"symfony/var-dumper": "^4.1.1"
Expand Down
0