Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ $obj_cluster = new RedisCluster('mycluster');
On construction, the RedisCluster class will iterate over the provided seed nodes until it can attain a connection to the cluster and run CLUSTER SLOTS to map every node in the cluster locally. Once the keyspace is mapped, RedisCluster will only connect to nodes when it needs to (e.g. you're getting a key that we believe is on that node.)

## Slot caching
Each time the `RedisCluster` class is constructed from scratch, phpredis needs to execute a `CLUSTER SLOTS` command to map the keyspace. Although this isn't an expensive command, it does require a round trip for each newly created object, which is inefficient. Starting from PhpRedis 5.0.0 these slots can be cached by setting `redis.clusters.cache_slots = 1` in `php.ini`.
Each time the `RedisCluster` class is constructed from scratch, phpredis needs to execute a `CLUSTER SLOTS` command to map the keyspace. Although this isn't an expensive command, it does require a round trip for each newly created object, which is inefficient. Starting from PhpRedis 5.0.0 these slots can be cached by setting `redis.clusters.cache_slots = 1` in `php.ini`.

### Slot cache expiration
You can also configure the cached slot maps to expire after a certain number of seconds. To do this set a positive value in `redis.clusters.slot_cache_expiry`. Expiring the cache could be beneficial in situations where new replica(s) are added to a cluster when PhpRedis has the topology cached. A non-destructiv change like this will not result in `MOVED` or `ASKING` responses from Redis so PhpRedis won't know to refresh the slot topology.

## Timeouts
Because Redis cluster is intended to provide high availability, timeouts do not work in the same way they do in normal socket communication. It's fully possible to have a timeout or even exception on a given socket (say in the case that a master node has failed), and continue to serve the request if and when a slave can be promoted as the new master.
Expand Down
132 changes: 109 additions & 23 deletions cluster_library.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@
#include "crc16.h"
#include <zend_exceptions.h>

#if PHP_VERSION_ID < 80300
#include "ext/standard/hrtime.h"
#else
#include "Zend/zend_hrtime.h"
#endif

#ifdef HAVE_REDIS_ATOMICS_MMAP
#include <stdatomic.h>
#include <sys/mman.h>

static _Atomic uint64_t *g_cluster_cache_gen;
static pid_t g_cluster_cache_pid;
#endif

extern zend_class_entry *redis_cluster_exception_ce;
int le_cluster_slot_cache;

Expand Down Expand Up @@ -883,6 +897,60 @@ cluster_free(redisCluster *c, int free_ctx)
if (free_ctx) efree(c);
}

static inline uint64_t redis_time(void) {
#define REDIS_NANO_IN_SEC ((uint64_t)1000000000)

#if PHP_VERSION_ID < 80300
return php_hrtime_current() / REDIS_NANO_IN_SEC;
#else
return zend_hrtime() / REDIS_NANO_IN_SEC;
#endif

#undef REDIS_NANO_IN_SEC
}

static zend_long cluster_cache_expiry(void) {
zend_long expiry;

expiry = INI_INT("redis.clusters.slot_cache_expiry");
if (expiry <= 0)
return 0;

return redis_time() + expiry;
}

#ifdef HAVE_REDIS_ATOMICS_MMAP
void cluster_cache_gen_init(void) {
g_cluster_cache_pid = getpid();
g_cluster_cache_gen = mmap(NULL, sizeof(uint64_t), PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_ANONYMOUS, -1, 0);
}

void cluster_cache_gen_free(void) {
if (g_cluster_cache_gen && g_cluster_cache_pid == getpid()) {
munmap(g_cluster_cache_gen, sizeof(uint64_t));
g_cluster_cache_gen = NULL;
}
}

int cluster_cache_gen_invalidate(void) {
if (g_cluster_cache_gen == NULL)
return FAILURE;

atomic_fetch_add_explicit(g_cluster_cache_gen, 1, memory_order_relaxed);

return SUCCESS;
}

static uint64_t cluster_cache_gen(void) {
if (g_cluster_cache_gen == NULL)
return 0;

return atomic_load(g_cluster_cache_gen);
}

#endif

/* Create a cluster slot cache structure */
PHP_REDIS_API
redisCachedCluster *cluster_cache_create(zend_string *hash, HashTable *nodes) {
Expand All @@ -892,6 +960,10 @@ redisCachedCluster *cluster_cache_create(zend_string *hash, HashTable *nodes) {

cc = pecalloc(1, sizeof(*cc), 1);
cc->hash = zend_string_dup(hash, 1);
cc->expiry = cluster_cache_expiry();
#ifdef HAVE_REDIS_ATOMICS_MMAP
cc->generation = cluster_cache_gen();
#endif

/* Copy nodes */
cc->master = pecalloc(zend_hash_num_elements(nodes), sizeof(*cc->master), 1);
Expand Down Expand Up @@ -1597,29 +1669,27 @@ PHP_REDIS_API short cluster_send_command(redisCluster *c, short slot, const char
timedout = c->waitms ? mstime() - msstart >= c->waitms : 0;
} while (!c->clusterdown && !timedout);

// If we've detected the cluster is down, throw an exception
if (c->clusterdown) {
cluster_cache_clear(c);
CLUSTER_THROW_EXCEPTION("The Redis Cluster is down (CLUSTERDOWN)", 0);
return -1;
} else if (timedout || resp == -1) {
// Make sure the socket is reconnected, it such that it is in a clean state
if (c->clusterdown || (timedout || resp == -1)) {
/* Flush slot cache and ensure a reconnection to reread the topology */
redis_sock_disconnect(c->cmd_sock, 1, 1);
cluster_cache_clear(c);

if (timedout) {
CLUSTER_THROW_EXCEPTION("Timed out attempting to find data in the correct node!", 0);
if (c->clusterdown) {
cluster_map_keyspace(c);
CLUSTER_THROW_EXCEPTION("The Redis Cluster is down (CLUSTERDOWN)", 0);
} else if (timedout) {
CLUSTER_THROW_EXCEPTION(
"Timed out attempting to find data in the correct node!", 0);
} else {
CLUSTER_THROW_EXCEPTION("Error processing response from Redis node!", 0);
CLUSTER_THROW_EXCEPTION(
"Error processing response from Redis node!", 0);
}

return -1;
}

/* Clear redirection flag */
/* Clear redirection flag and return success */
c->redir_type = REDIR_NONE;

// Success, return the slot where data exists.
return 0;
}

Expand Down Expand Up @@ -3105,21 +3175,34 @@ zend_string *cluster_hash_seeds(zend_string **seeds, uint32_t count) {
}

PHP_REDIS_API redisCachedCluster *cluster_cache_load(zend_string *hash) {
redisCachedCluster *cc;
zend_resource *le;

/* Look for cached slot information */
le = zend_hash_find_ptr(&EG(persistent_list), hash);
if (le == NULL)
return NULL;

if (le != NULL) {
/* Sanity check on our list type */
if (le->type == le_cluster_slot_cache) {
/* Success, return the cached entry */
return le->ptr;
}
if (le->type != le_cluster_slot_cache) {
php_error_docref(0, E_WARNING, "Invalid slot cache resource");
return NULL;
}

/* Not found */
cc = le->ptr;
/* Short circuit if it should be expired */
if (cc->expiry != 0 && cc->expiry <= redis_time())
goto invalidated;

#ifdef HAVE_REDIS_ATOMICS_MMAP
/* Short circuit if it has been globally invalidated */
if (cluster_cache_gen() != cc->generation)
goto invalidated;
#endif

return cc;

invalidated:
zend_hash_del(&EG(persistent_list), hash);
return NULL;
}

Expand All @@ -3130,11 +3213,14 @@ PHP_REDIS_API void cluster_cache_store(zend_string *hash, HashTable *nodes) {
redis_register_persistent_resource(cc->hash, cc, le_cluster_slot_cache);
}

void cluster_cache_clear(redisCluster *c)
{
/* Flush the slot cache for the provided cluster, if one exists. Success and
* failure in this context just means "did we remove it" */
int cluster_cache_clear(redisCluster *c) {
if (c->cache_key) {
zend_hash_del(&EG(persistent_list), c->cache_key);
return zend_hash_del(&EG(persistent_list), c->cache_key);
}

return FAILURE;
}


Expand Down
18 changes: 16 additions & 2 deletions cluster_library.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
#include "TSRM.h"
#endif

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

/* Redis cluster hash slots and N-1 which we'll use to find it */
#define REDIS_CLUSTER_SLOTS 16384
#define REDIS_CLUSTER_MOD (REDIS_CLUSTER_SLOTS-1)
Expand Down Expand Up @@ -157,10 +161,13 @@ typedef struct redisCachedMaster {
} redisCachedMaster;

typedef struct redisCachedCluster {
// int rsrc_id; /* Zend resource ID */
zend_string *hash; /* What we're cached by */
redisCachedMaster *master; /* Array of masters */
size_t count; /* Number of masters */
uint64_t expiry; /* Expiry time (if any) */
#ifdef HAVE_REDIS_ATOMICS_MMAP
uint64_t generation; /* Shared invalidation generation */
#endif
} redisCachedCluster;

/* A Redis Cluster master node */
Expand Down Expand Up @@ -386,13 +393,20 @@ PHP_REDIS_API redisCachedCluster *cluster_cache_create(zend_string *hash, HashTa
PHP_REDIS_API void cluster_cache_free(redisCachedCluster *rcc);
PHP_REDIS_API void cluster_init_cache(redisCluster *c, redisCachedCluster *rcc);

/* Conditionally compiled shared slot cache invalidation functions */
#ifdef HAVE_REDIS_ATOMICS_MMAP
void cluster_cache_gen_init(void);
void cluster_cache_gen_free(void);
int cluster_cache_gen_invalidate(void);
#endif

/* Functions to facilitate cluster slot caching */

PHP_REDIS_API char **cluster_sock_read_multibulk_reply(RedisSock *redis_sock, int *len);

PHP_REDIS_API void cluster_cache_store(zend_string *hash, HashTable *nodes);
PHP_REDIS_API redisCachedCluster *cluster_cache_load(zend_string *hash);
void cluster_cache_clear(redisCluster *c);
int cluster_cache_clear(redisCluster *c);

/*
* Redis Cluster response handlers. Our response handlers generally take the
Expand Down
19 changes: 19 additions & 0 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,25 @@ if test "$PHP_REDIS" != "no"; then
fi
fi

dnl Check if we can use C11 atomics and anonymous shared mmap
AC_LINK_IFELSE(
[AC_LANG_PROGRAM([[
#include <stddef.h>
#include <stdatomic.h>
#include <sys/mman.h>
]], [[
static _Atomic int test = 0;
void *ptr = mmap(NULL, 8, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_ANONYMOUS, -1, 0);
if (ptr == (void *)-1) return 1;
atomic_fetch_add(&test, 1);
return 0;
]])],
[AC_DEFINE([HAVE_REDIS_ATOMICS_MMAP], [1],
[Define if C11 atomics and MAP_SHARED|MAP_ANONYMOUS mmap are usable])],
[]
)

AC_CHECK_PROG([GIT], [git], [yes], [no])
if test "$GIT" = "yes" && test -d "$srcdir/.git"; then
AC_DEFINE_UNQUOTED(GIT_REVISION, ["$(git log -1 --format=%H)"], [ ])
Expand Down
21 changes: 20 additions & 1 deletion redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ PHP_INI_BEGIN()

/* redis cluster */
PHP_INI_ENTRY("redis.clusters.cache_slots", "0", PHP_INI_ALL, NULL)
PHP_INI_ENTRY("redis.clusters.slot_cache_expiry", "0", PHP_INI_ALL, NULL)

#ifdef HAVE_REDIS_ATOMICS_MMAP
PHP_INI_ENTRY("redis.clusters.shared_slot_cache_invalidation", "0",
PHP_INI_ALL, NULL)
#endif
PHP_INI_ENTRY("redis.clusters.auth", "", PHP_INI_ALL, NULL)
PHP_INI_ENTRY("redis.clusters.persistent", "0", PHP_INI_ALL, NULL)
PHP_INI_ENTRY("redis.clusters.read_timeout", "0", PHP_INI_ALL, NULL)
Expand Down Expand Up @@ -144,7 +150,7 @@ zend_module_entry redis_module_entry = {
"redis",
NULL,
PHP_MINIT(redis),
NULL,
PHP_MSHUTDOWN(redis),
NULL,
NULL,
PHP_MINFO(redis),
Expand Down Expand Up @@ -379,6 +385,11 @@ PHP_MINIT_FUNCTION(redis)
"Redis cluster slot cache",
module_number);

#ifdef HAVE_REDIS_ATOMICS_MMAP
/* Initialize shared slot cache invalidation */
cluster_cache_gen_init();
#endif

/* RedisException class */
redis_exception_ce = register_class_RedisException(spl_ce_RuntimeException);

Expand All @@ -394,6 +405,14 @@ PHP_MINIT_FUNCTION(redis)
return SUCCESS;
}

PHP_MSHUTDOWN_FUNCTION(redis) {
#ifdef HAVE_REDIS_ATOMICS_MMAP
cluster_cache_gen_free();
#endif

return SUCCESS;
}

static const char *
get_available_serializers(void)
{
Expand Down
21 changes: 21 additions & 0 deletions redis_cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1768,6 +1768,27 @@ static void redisClearNodeBytes(redisClusterNode *node) {
}
}

PHP_METHOD(RedisCluster, flushSlotCache) {
redisCluster *c = GET_CONTEXT();

ZEND_PARSE_PARAMETERS_NONE();

RETURN_BOOL(cluster_cache_clear(c) == SUCCESS);
}

#ifdef HAVE_REDIS_ATOMICS_MMAP
PHP_METHOD(RedisCluster, invalidateSlotCaches) {
ZEND_PARSE_PARAMETERS_NONE();

if (INI_INT("redis.clusters.shared_slot_cache_invalidation") == 0) {
php_error_docref(NULL, E_WARNING, "Shared slot cache invalidation disabled");
RETURN_FALSE;
}

RETURN_BOOL(cluster_cache_gen_invalidate() == SUCCESS);
}
#endif

PHP_METHOD(RedisCluster, gettransferredbytes) {
redisCluster *c = GET_CONTEXT();
zend_long rx = 0, tx = 0;
Expand Down
16 changes: 16 additions & 0 deletions redis_cluster.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@ public function _masters(): array;

public function _redir(): string|null;

/**
* Flush the persistent slot cache, if one exists.
* @return bool Whether the slot cache was flushed.
*/
public function flushSlotCache(): bool;

#ifdef HAVE_REDIS_ATOMICS_MMAP
/**
* Invaalidate all slot caches for across all workers. Only available on
* linux like systems with c11 atomics and shared memory allocation
*
* @return bool Whether we could invalidate any cache(es)
*/
public static function invalidateSlotCaches(): bool;
#endif

/**
* @see Redis::acl
*/
Expand Down
Loading
Loading