8000 Add a new failover option prioritising nodes by aolley · Pull Request #1896 · phpredis/phpredis · GitHub
[go: up one dir, main page]

Skip to content

Add a new failover option prioritising nodes #1896

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 91 additions & 5 deletions cluster_library.c
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,10 @@ PHP_REDIS_API redisCluster *cluster_create(double timeout, double read_timeout,
ALLOC_HASHTABLE(c->nodes);
zend_hash_init(c->nodes, 0, NULL, ht_free_node, 0);

/* Allocate our preferred nodes HashTable */
ALLOC_HASHTABLE(c->preferred_nodes);
zend_hash_init(c->preferred_nodes, 0, NULL, NULL, 0);

return c;
}

Expand All @@ -861,10 +865,12 @@ cluster_free(redisCluster *c, int free_ctx)
/* Call hash table destructors */
zend_hash_destroy(c->seeds);
zend_hash_destroy(c->nodes);
zend_hash_destroy(c->preferred_nodes);

/* Free hash tables themselves */
efree(c->seeds);
efree(c->nodes);
efree(c->preferred_nodes);

/* Free any error we've got */
if (c->err) zend_string_release(c->err);
Expand Down Expand Up @@ -1235,10 +1241,72 @@ PHP_REDIS_API void cluster_disconnect(redisCluster *c, int force) {
} ZEND_HASH_FOREACH_END();
}

int preferred_compare(const void *const first, const void *const second)
{
const preferredNode* a = (const preferredNode*)first;
const preferredNode* b = (const preferredNode*)second;
if (a->preferred == 1 && b->preferred == 0)
return -1;
else if (a->preferred == 0 && b->preferred == 1)
return 1;
else if (a->original_order < b->original_order)
return -1;
else if (a->original_order > b->original_order)
return 1;
else
return 0;
}

/* This method takes the randomised list of nodes and sorts preferred nodes to
* the top. */
static void preferredsort(int *array, size_t len, redisCluster *c,
unsigned short slot)
{
int i, temp, key_len, *prefnodes;
size_t r;
RedisSock *redis_sock;
char key[1024];
zval *node;

struct preferredNode ab[len];

// array: key => order; value => node-idx (0=master)
for (i = 0; i < len; i++) {
// Get node host+port string.
redis_sock = cluster_slot_sock(c, c->cmd_slot, array[i]);

ab[i].idx = array[i];
ab[i].original_order = i;
ab[i].preferred = 0;
if (!redis_sock) {
continue;
}

// Is it in the preferred_nodes map?
snprintf(key, sizeof(key), "%s:%d", ZSTR_VAL(redis_sock->host), redis_sock->port);
// Perhaps the preferred_nodes table should be keyed on the host:port string to make this
// easier and faster.
ZEND_HASH_FOREACH_VAL(c->preferred_nodes, node) {
if (strcmp(Z_STRVAL_P(node), key) == 0) {
ab[i].preferred = 1;
break;
}

} ZEND_HASH_FOREACH_END();
}

// Sort preferred nodes to the top of the list.
qsort(ab, len, sizeof(*ab), preferred_compare);
for (i = 0; i < len; i++) {
array[i] = ab[i].idx;
}
}


/* This method attempts to write our command at random to the master and any
* attached slaves, until we either successufly do so, or fail. */
static int cluster_dist_write(redisCluster *c, const char *cmd, size_t sz,
int nomaster)
int nomaster, int preferred)
{
int i, count = 1, *nodes;
RedisSock *redis_sock;
Expand All @@ -1256,6 +1324,11 @@ static int cluster_dist_write(redisCluster *c, const char *cmd, size_t sz,
for (i = 0; i < count; i++) nodes[i] = i;
fyshuffle(nodes, count);

/* Shift preferred nodes to the top of the list if we're in preferred
* mode */
if (preferred && zend_hash_num_elements(c->preferred_nodes) > 0)
preferredsort(nodes, count, c, c->cmd_slot);

/* Iterate through our nodes until we find one we can write to or fail */
for (i = 0; i < count; i++) {
/* Skip if this is the master node and we don't want to query that */
Expand Down Expand Up @@ -1306,6 +1379,8 @@ static int cluster_dist_write(redisCluster *c, const char *cmd, size_t sz,
* REDIS_FAILOVER_DISTRIBUTE_SLAVES:
* We pick at random from slave nodes of a given master. This option is
* used to load balance read queries against N slaves.
* REDIS_FAILOVER_PREFERRED:
* Similar to DISTRIBUTE, but with a list of nodes we prefer over others.
*
* Once we are able to find a node we can write to, we check for MOVED or
* ASKING redirection, such that the keyspace can be updated.
Expand All @@ -1315,7 +1390,7 @@ static int cluster_sock_write(redisCluster *c, const char *cmd, size_t sz,
{
redisClusterNode *seed_node;
RedisSock *redis_sock;
int failover, nomaster;
int failover, nomaster, preferred;

/* First try the socket requested */
redis_sock = c->cmd_sock;
Expand Down Expand Up @@ -1343,12 +1418,13 @@ static int cluster_sock_write(redisCluster *c, const char *cmd, size_t sz,
} else if (failover == REDIS_FAILOVER_ERROR) {
/* Try the master, then fall back to any slaves we may have */
if (CLUSTER_SEND_PAYLOAD(redis_sock, cmd, sz) ||
!cluster_dist_write(c, cmd, sz, 1)) return 0;
!cluster_dist_write(c, cmd, sz, 1, 0)) return 0;
} else {
/* Include or exclude master node depending on failover option and
* attempt to make our write */
nomaster = failover == REDIS_FAILOVER_DISTRIBUTE_SLAVES;
if (!cluster_dist_write(c, cmd, sz, nomaster)) {
preferred = failover == REDIS_FAILOVER_PREFERRED;
if (!cluster_dist_write(c, cmd, sz, nomaster, preferred)) {
/* We were able to write to a master or slave at random */
return 0;
}
Expand Down Expand Up @@ -1520,7 +1596,7 @@ PHP_REDIS_API int cluster_send_slot(redisCluster *c, short slot, char *cmd,
PHP_REDIS_API short cluster_send_command(redisCluster *c, short slot, const char *cmd,
int cmd_len)
{
int resp, timedout = 0;
int failovertoggle, resp, timedout = 0;
long msstart;

if (!SLOT(c, slot)) {
Expand Down Expand Up @@ -1577,6 +1653,11 @@ PHP_REDIS_API short cluster_send_command(redisCluster *c, short slot, const char
/* For MOVED redirection we want to update our cached mapping */
cluster_update_slot(c);
c->cmd_sock = SLOT_SOCK(c, slot);
if (c->failover == REDIS_FAILOVER_PREFERRED) {
// On MOVED, turn off preferred mode until we're done with the command.
c->failover = REDIS_FAILOVER_NONE;
failovertoggle = REDIS_FAILOVER_PREFERRED;
}
} else if (c->redir_type == REDIR_ASK) {
/* For ASK redirection we want to redirect but not update slot mapping */
c->cmd_sock = cluster_get_asking_sock(c);
Expand Down Expand Up @@ -1607,6 +1688,11 @@ PHP_REDIS_API short cluster_send_command(redisCluster *c, short slot, const char
/* Clear redirection flag */
c->redir_type = REDIR_NONE;

// If we changed failover mode, switch it back.
if (failovertoggle) {
c->failover = failovertoggle;
}

// Success, return the slot where data exists.
return 0;
}
Expand Down
9 changes: 9 additions & 0 deletions cluster_library.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ typedef struct redisCluster {
/* All RedisCluster objects we've created/are connected to */
HashTable *nodes;

/* A list of nodes we prefer to talk to when in FAILOVER_PREFERRED mode */
HashTable *preferred_nodes;

/* Transaction handling linked list, and where we are as we EXEC */
clusterFoldItem *multi_head;
clusterFoldItem *multi_curr;
Expand Down Expand Up @@ -259,6 +262,12 @@ typedef struct redisCluster {
/* RedisCluster response processing callback */
typedef void (*cluster_cb)(INTERNAL_FUNCTION_PARAMETERS, redisCluster*, void*);

typedef struct preferredNode {
int idx;
int preferred;
int original_order;
} preferredNode;

/* Context for processing transactions */
struct clusterFoldItem {
/* Response processing callback */
Expand Down
2 changes: 2 additions & 0 deletions common.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,14 @@ typedef enum _PUBSUB_TYPE {
#define REDIS_OPT_BACKOFF_ALGORITHM 12
#define REDIS_OPT_BACKOFF_BASE 13
#define REDIS_OPT_BACKOFF_CAP 14
#define REDIS_OPT_PREFERRED_NODES 15

/* cluster options */
#define REDIS_FAILOVER_NONE 0
#define REDIS_FAILOVER_ERROR 1
#define REDIS_FAILOVER_DISTRIBUTE 2
#define REDIS_FAILOVER_DISTRIBUTE_SLAVES 3
#define REDIS_FAILOVER_PREFERRED 4
/* serializers */
typedef enum {
REDIS_SERIALIZER_NONE,
Expand Down
2 changes: 2 additions & 0 deletions redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ static void add_class_constants(zend_class_entry *ce, int is_cluster) {
zend_declare_class_constant_long(ce, ZEND_STRL("OPT_REPLY_LITERAL"), REDIS_OPT_REPLY_LITERAL);
zend_declare_class_constant_long(ce, ZEND_STRL("OPT_COMPRESSION_LEVEL"), REDIS_OPT_COMPRESSION_LEVEL);
zend_declare_class_constant_long(ce, ZEND_STRL("OPT_NULL_MULTIBULK_AS_NULL"), REDIS_OPT_NULL_MBULK_AS_NULL);
zend_declare_class_constant_long(ce, ZEND_STRL("OPT_PREFERRED_NODES"), REDIS_OPT_PREFERRED_NODES);

/* serializer */
zend_declare_class_constant_long(ce, ZEND_STRL("SERIALIZER_NONE"), REDIS_SERIALIZER_NONE);
Expand Down Expand Up @@ -379,6 +380,7 @@ static void add_class_constants(zend_class_entry *ce, int is_cluster) {
zend_declare_class_constant_long(ce, ZEND_STRL("FAILOVER_ERROR"), REDIS_FAILOVER_ERROR);
zend_declare_class_constant_long(ce, ZEND_STRL("FAILOVER_DISTRIBUTE"), REDIS_FAILOVER_DISTRIBUTE);
zend_declare_class_constant_long(ce, ZEND_STRL("FAILOVER_DISTRIBUTE_SLAVES"), REDIS_FAILOVER_DISTRIBUTE_SLAVES);
zend_declare_class_constant_long(ce, ZEND_STRL("FAILOVER_PREFERRED"), REDIS_FAILOVER_PREFERRED);
} else {
/* Cluster doesn't support pipelining at this time */
zend_declare_class_constant_long(ce, ZEND_STRL("PIPELINE"), PIPELINE);
Expand Down
4 changes: 4 additions & 0 deletions redis_cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ zend_object * create_cluster_context(zend_class_entry *class_type) {
ALLOC_HASHTABLE(cluster->nodes);
zend_hash_init(cluster->nodes, 0, NULL, ht_free_node, 0);

/* Allocate our preferred nodes HashTable */
ALLOC_HASHTABLE(cluster->preferred_nodes);
zend_hash_init(cluster->preferred_nodes, 0, NULL, NULL, 0);

// Initialize it
zend_object_std_init(&cluster->std, class_type);

Expand Down
14 changes: 13 additions & 1 deletion redis_commands.c
Original file line number Diff line number Diff line change
Expand Up @@ -4305,6 +4305,8 @@ void redis_getoption_handler(INTERNAL_FUNCTION_PARAMETERS,
RETURN_LONG(redis_sock->backoff.base / 1000);
case REDIS_OPT_BACKOFF_CAP:
RETURN_LONG(redis_sock->backoff.cap / 1000);
case REDIS_OPT_PREFERRED_NODES:
RETURN_ARR(zend_array_dup(c->preferred_nodes));
default:
RETURN_FALSE;
}
Expand Down Expand Up @@ -4432,7 +4434,8 @@ void redis_setoption_handler(INTERNAL_FUNCTION_PARAMETERS,
if (val_long == REDIS_FAILOVER_NONE ||
val_long == REDIS_FAILOVER_ERROR ||
val_long == REDIS_FAILOVER_DISTRIBUTE ||
val_long == REDIS_FAILOVER_DISTRIBUTE_SLAVES)
val_long == REDIS_FAILOVER_DISTRIBUTE_SLAVES ||
val_long == REDIS_FAILOVER_PREFERRED)
{
c->failover = val_long;
RETURN_TRUE;
Expand Down Expand Up @@ -4467,6 +4470,15 @@ void redis_setoption_handler(INTERNAL_FUNCTION_PARAMETERS,
RETURN_TRUE;
}
break;
case REDIS_OPT_PREFERRED_NODES:
if (Z_TYPE_P(val) != IS_ARRAY)
RETURN_FALSE;
if (c->preferred_nodes) {
zend_hash_destroy(c->preferred_nodes);
FREE_HASHTABLE(c->preferred_nodes);
}
c->preferred_nodes = zend_array_dup(Z_ARRVAL_P(val));
RETURN_TRUE;
EMPTY_SWITCH_DEFAULT_CASE()
}
RETURN_FALSE;
Expand Down
26 changes: 26 additions & 0 deletions tests/RedisClusterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,32 @@ public function testFailOver() {
}
}

public function testFailOverPreferred() {
return $this->markTestSkipped(); // this test is racy.
$arr_value_ref = [];
$arr_type_ref = [];

/* Set a bunch of keys of various redis types*/
for ($i = 0; $i < 1; $i++) {
foreach ($this->_arr_redis_types as $i_type) {
$str_key = $this->setKeyVals($i, $i_type, $arr_value_ref);
$arr_type_ref[$str_key] = $i_type;
}
}

// Give slaves a chance to catch up with the master.
// This is racy... which is probably why the above test skips the
// failover modes other than 'NONE'.
usleep(1000);

$this->redis->setOption(RedisCluster::OPT_SLAVE_FAILOVER, RedisCluster::FAILOVER_PREFERRED);
$this->redis->setOption(RedisCluster::OPT_PREFERRED_NODES, ['127.0.0.1:7000']);

foreach ($arr_value_ref as $str_key => $value) {
$this->checkKeyValue($str_key, $arr_type_ref[$str_key], $value);
}
}

/* Test a 'raw' command */
public function testRawCommand() {
$this->redis->rawCommand('mykey', 'set', 'mykey', 'my-value');
Expand Down
0