Quellcode durchsuchen

db_redis: use cluster api

Riccardo Villa vor 3 Jahren
Ursprung
Commit
8eec0115a2

+ 112 - 3
src/modules/db_redis/redis_connection.c

@@ -25,6 +25,15 @@
 #include "redis_table.h"
 #include "redis_table.h"
 #include "redis_dbase.h"
 #include "redis_dbase.h"
 
 
+#ifdef WITH_HIREDIS_CLUSTER
+static unsigned int MAX_URL_LENGTH = 1023;
+#define redisCommand redisClusterCommand
+#define redisFree redisClusterFree
+#define redisCommandArgv redisClusterCommandArgv
+#define redisAppendCommandArgv redisClusterAppendCommandArgv
+#define redisGetReply redisClusterGetReply
+#endif
+
 extern int db_redis_verbosity;
 extern int db_redis_verbosity;
 
 
 static void print_query(redis_key_t *query) {
 static void print_query(redis_key_t *query) {
@@ -104,21 +113,59 @@ static redis_key_t* db_redis_shift_query(km_redis_con_t *con) {
 int db_redis_connect(km_redis_con_t *con) {
 int db_redis_connect(km_redis_con_t *con) {
     struct timeval tv;
     struct timeval tv;
     redisReply *reply;
     redisReply *reply;
+#ifndef WITH_HIREDIS_CLUSTER
     int db;
     int db;
+#endif
 
 
     tv.tv_sec = 1;
     tv.tv_sec = 1;
     tv.tv_usec = 0;
     tv.tv_usec = 0;
-
+#ifndef WITH_HIREDIS_CLUSTER
     db = atoi(con->id->database);
     db = atoi(con->id->database);
+#endif
     reply = NULL;
     reply = NULL;
 
 
     // TODO: introduce require_master mod-param and check if we're indeed master
     // TODO: introduce require_master mod-param and check if we're indeed master
     // TODO: on carrier, if we have db fail-over, the currently connected
     // TODO: on carrier, if we have db fail-over, the currently connected
     // redis server will become slave without dropping connections?
     // redis server will become slave without dropping connections?
-
+#ifdef WITH_HIREDIS_CLUSTER
+    int status;
+    char hosts[MAX_URL_LENGTH];
+    char* host_begin;
+    char* host_end;
+    LM_DBG("connecting to redis cluster at %.*s\n", con->id->url.len, con->id->url.s);
+    host_begin = strstr(con->id->url.s, "redis://");
+    if (host_begin) {
+        host_begin += 8;
+    } else {
+        LM_ERR("invalid url scheme\n");
+        goto err;
+    }
+    host_end = strstr(host_begin, "/");
+    if (! host_end) {
+        LM_ERR("invalid url: cannot find end of host part\n");
+        goto err;
+    }
+    if ((host_end - host_begin) > (MAX_URL_LENGTH-1)) {
+        LM_ERR("url too long\n");
+        goto err;
+    }
+    strncpy(hosts, host_begin, (host_end - host_begin));
+    hosts[MAX_URL_LENGTH-1] = '\0';
+    con->con = redisClusterContextInit();
+    if (! con->con) {
+        LM_ERR("no private memory left\n");
+        goto err;
+    }
+    redisClusterSetOptionAddNodes(con->con, hosts);
+    redisClusterSetOptionConnectTimeout(con->con, tv);
+    status = redisClusterConnect2(con->con);
+    if (status != REDIS_OK) {
+        LM_ERR("cannot open connection to cluster with hosts: %s, error: %s\n", hosts, con->con->errstr);
+        goto err;
+    }
+#else
     LM_DBG("connecting to redis at %s:%d\n", con->id->host, con->id->port);
     LM_DBG("connecting to redis at %s:%d\n", con->id->host, con->id->port);
     con->con = redisConnectWithTimeout(con->id->host, con->id->port, tv);
     con->con = redisConnectWithTimeout(con->id->host, con->id->port, tv);
-
     if (!con->con) {
     if (!con->con) {
         LM_ERR("cannot open connection: %.*s\n", con->id->url.len, con->id->url.s);
         LM_ERR("cannot open connection: %.*s\n", con->id->url.len, con->id->url.s);
         goto err;
         goto err;
@@ -128,6 +175,7 @@ int db_redis_connect(km_redis_con_t *con) {
             con->con->errstr);
             con->con->errstr);
         goto err;
         goto err;
     }
     }
+#endif
 
 
     if (con->id->password) {
     if (con->id->password) {
         reply = redisCommand(con->con, "AUTH %s", con->id->password);
         reply = redisCommand(con->con, "AUTH %s", con->id->password);
@@ -144,6 +192,7 @@ int db_redis_connect(km_redis_con_t *con) {
         freeReplyObject(reply); reply = NULL;
         freeReplyObject(reply); reply = NULL;
     }
     }
 
 
+#ifndef WITH_HIREDIS_CLUSTER
     reply = redisCommand(con->con, "PING");
     reply = redisCommand(con->con, "PING");
     if (!reply) {
     if (!reply) {
         LM_ERR("cannot ping server on connection %.*s: %s\n",
         LM_ERR("cannot ping server on connection %.*s: %s\n",
@@ -169,8 +218,10 @@ int db_redis_connect(km_redis_con_t *con) {
         goto err;
         goto err;
     }
     }
     freeReplyObject(reply); reply = NULL;
     freeReplyObject(reply); reply = NULL;
+#endif
     LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s);
     LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s);
 
 
+#ifndef WITH_HIREDIS_CLUSTER
     reply = redisCommand(con->con, "SCRIPT LOAD %s", SREM_KEY_LUA);
     reply = redisCommand(con->con, "SCRIPT LOAD %s", SREM_KEY_LUA);
     if (!reply) {
     if (!reply) {
         LM_ERR("failed to load LUA script to server %.*s: %s\n",
         LM_ERR("failed to load LUA script to server %.*s: %s\n",
@@ -194,6 +245,7 @@ int db_redis_connect(km_redis_con_t *con) {
     }
     }
     strcpy(con->srem_key_lua, reply->str);
     strcpy(con->srem_key_lua, reply->str);
     freeReplyObject(reply); reply = NULL;
     freeReplyObject(reply); reply = NULL;
+#endif
     LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s);
     LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s);
 
 
     return 0;
     return 0;
@@ -291,6 +343,63 @@ void db_redis_free_connection(struct pool_con* con) {
     pkg_free(_c);
     pkg_free(_c);
 }
 }
 
 
+#ifdef WITH_HIREDIS_CLUSTER
+void *db_redis_command_argv_to_node(km_redis_con_t *con, redis_key_t *query, cluster_node *node) {
+    char **argv = NULL;
+    int argc;
+    #define MAX_CMD_LENGTH 256
+    char cmd[MAX_CMD_LENGTH] = "";
+    size_t cmd_len = MAX_CMD_LENGTH - 1;
+    int i;
+
+    print_query(query);
+
+    argc = db_redis_key_list2arr(query, &argv);
+    if (argc < 0) {
+        LM_ERR("Failed to allocate memory for query array\n");
+        return NULL;
+    }
+    LM_DBG("query has %d args\n", argc);
+
+    for (i=0; i<argc; i++)
+    {
+        size_t arg_len = strlen(argv[i]);
+        if (arg_len > cmd_len)
+                        break;
+        strncat(cmd, argv[i], cmd_len);
+        cmd_len = cmd_len - arg_len;
+        if (cmd_len == 0)
+            break;
+        
+        if (i != argc - 1) {
+            strncat(cmd, " ", cmd_len);
+            cmd_len--;
+        }
+
+    }
+
+    LM_DBG("cmd is %s\n", cmd);
+
+    redisReply *reply = redisClusterCommandToNode(con->con, node, cmd);
+    if (con->con->err != REDIS_OK) {
+        LM_DBG("redis connection is gone, try reconnect. (%d:%s)\n", con->con->err, con->con->errstr);
+        if (db_redis_connect(con) != 0) {
+            LM_ERR("Failed to reconnect to redis db\n");
+            pkg_free(argv);
+            if (con->con) {
+                redisFree(con->con);
+                con->con = NULL;
+            }
+            return NULL;
+        }
+        reply = redisClusterCommandToNode(con->con, node, cmd);
+    }
+    pkg_free(argv);
+    return reply;
+}
+    
+#endif
+
 void *db_redis_command_argv(km_redis_con_t *con, redis_key_t *query) {
 void *db_redis_command_argv(km_redis_con_t *con, redis_key_t *query) {
     char **argv = NULL;
     char **argv = NULL;
     int argc;
     int argc;

+ 33 - 1
src/modules/db_redis/redis_connection.h

@@ -23,14 +23,19 @@
 #ifndef _REDIS_CONNECTION_H_
 #ifndef _REDIS_CONNECTION_H_
 #define _REDIS_CONNECTION_H_
 #define _REDIS_CONNECTION_H_
 
 
+#ifdef WITH_HIREDIS_CLUSTER
+#include <hircluster.h>
+#else
 #ifdef WITH_HIREDIS_PATH
 #ifdef WITH_HIREDIS_PATH
 #include <hiredis/hiredis.h>
 #include <hiredis/hiredis.h>
 #else
 #else
 #include <hiredis.h>
 #include <hiredis.h>
 #endif
 #endif
+#endif
 
 
 #include "db_redis_mod.h"
 #include "db_redis_mod.h"
 
 
+#ifndef WITH_REDIS_CLUSTER
 #define db_redis_check_reply(con, reply, err) do { \
 #define db_redis_check_reply(con, reply, err) do { \
     if (!(reply) && !(con)->con) { \
     if (!(reply) && !(con)->con) { \
         LM_ERR("Failed to fetch type entry: no connection to server\n"); \
         LM_ERR("Failed to fetch type entry: no connection to server\n"); \
@@ -49,6 +54,26 @@
         goto err; \
         goto err; \
     } \
     } \
 } while(0);
 } while(0);
+#else
+#define db_redis_check_reply(con, reply, err) do { \
+    if (!(reply) && !(con)->con) { \
+        LM_ERR("Failed to fetch type entry: no connection to server\n"); \
+        goto err; \
+    } \
+    if (!(reply)) { \
+        LM_ERR("Failed to fetch type entry: %s\n", \
+                (con)->con->errstr); \
+        redisClusterFree((con)->con); \
+        (con)->con = NULL; \
+        goto err; \
+    } \
+    if ((reply)->type == REDIS_REPLY_ERROR) { \
+        LM_ERR("Failed to fetch type entry: %s\n", \
+                (reply)->str); \
+        goto err; \
+    } \
+} while(0);
+#endif
 
 
 typedef struct redis_key redis_key_t;
 typedef struct redis_key redis_key_t;
 
 
@@ -61,8 +86,11 @@ typedef struct km_redis_con {
     struct db_id* id;
     struct db_id* id;
     unsigned int ref;
     unsigned int ref;
     struct pool_con* next;
     struct pool_con* next;
-
+#ifdef WITH_HIREDIS_CLUSTER
+    redisClusterContext *con;
+#else
     redisContext *con;
     redisContext *con;
+#endif
     redis_command_t *command_queue;
     redis_command_t *command_queue;
     unsigned int append_counter;
     unsigned int append_counter;
     struct str_hash_table tables;
     struct str_hash_table tables;
@@ -86,4 +114,8 @@ void db_redis_consume_replies(km_redis_con_t *con);
 void db_redis_free_reply(redisReply **reply);
 void db_redis_free_reply(redisReply **reply);
 const char *db_redis_get_error(km_redis_con_t *con);
 const char *db_redis_get_error(km_redis_con_t *con);
 
 
+#ifdef WITH_HIREDIS_CLUSTER
+void *db_redis_command_argv_to_node(km_redis_con_t *con, redis_key_t *query, cluster_node *node);
+#endif
+
 #endif /* _REDIS_CONNECTION_H_ */
 #endif /* _REDIS_CONNECTION_H_ */

+ 133 - 2
src/modules/db_redis/redis_dbase.c

@@ -832,10 +832,23 @@ static int db_redis_scan_query_keys_pattern(km_redis_con_t *con, const str *matc
         goto err;
         goto err;
     }
     }
 
 
+#ifdef WITH_HIREDIS_CLUSTER
+nodeIterator niter;
+cluster_node *node;
+initNodeIterator(&niter, con->con);
+while ((node = nodeNext(&niter)) != NULL) {
+    if (node->role != REDIS_ROLE_MASTER)
+        continue;
+    reply = db_redis_command_argv_to_node(con, query_v, node);
+    if (!reply) {
+        LM_ERR("Invalid null reply from node %s\n", node->addr);
+        goto err;
+    }
+
+#else
     reply = db_redis_command_argv(con, query_v);
     reply = db_redis_command_argv(con, query_v);
-    db_redis_key_free(&query_v);
+#endif
     db_redis_check_reply(con, reply, err);
     db_redis_check_reply(con, reply, err);
-
     keys_list = reply;
     keys_list = reply;
 
 
 #endif
 #endif
@@ -880,6 +893,10 @@ static int db_redis_scan_query_keys_pattern(km_redis_con_t *con, const str *matc
     } while (cursor > 0);
     } while (cursor > 0);
 #endif
 #endif
 
 
+#ifdef WITH_HIREDIS_CLUSTER
+    }
+#endif    
+
     // for full table scans, we have to manually match all given keys
     // for full table scans, we have to manually match all given keys
     // but only do this once for repeated invocations
     // but only do this once for repeated invocations
     if (!*manual_keys) {
     if (!*manual_keys) {
@@ -898,6 +915,8 @@ static int db_redis_scan_query_keys_pattern(km_redis_con_t *con, const str *matc
     if (reply) {
     if (reply) {
         db_redis_free_reply(&reply);
         db_redis_free_reply(&reply);
     }
     }
+    
+    db_redis_key_free(&query_v);
 
 
     LM_DBG("got %lu entries by scan\n", (unsigned long) i);
     LM_DBG("got %lu entries by scan\n", (unsigned long) i);
     return 0;
     return 0;
@@ -1636,6 +1655,10 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con
     redis_key_t *type_key;
     redis_key_t *type_key;
     redis_key_t *set_key;
     redis_key_t *set_key;
 
 
+#ifdef WITH_HIREDIS_CLUSTER
+    long long scard;
+#endif
+
     if (!*keys_count && do_table_scan) {
     if (!*keys_count && do_table_scan) {
         if (!ts_scan_start)
         if (!ts_scan_start)
             LM_WARN("performing full table scan on table '%.*s' while performing delete\n",
             LM_WARN("performing full table scan on table '%.*s' while performing delete\n",
@@ -1806,6 +1829,57 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con
         for (type_key = type_keys, set_key = set_keys; type_key;
         for (type_key = type_keys, set_key = set_keys; type_key;
                 type_key = type_key->next, set_key = set_key->next) {
                 type_key = type_key->next, set_key = set_key->next) {
 
 
+#ifdef WITH_HIREDIS_CLUSTER
+            if (db_redis_key_add_string(&query_v, "SREM", 4) != 0) {
+                LM_ERR("Failed to add srem command to post-delete query\n");
+                goto error;
+            }
+            if (db_redis_key_add_str(&query_v, &type_key->key) != 0) {
+                LM_ERR("Failed to add key to delete query\n");
+                goto error;
+            }
+            if (db_redis_key_add_str(&query_v, key) != 0) {
+                LM_ERR("Failed to add key to delete query\n");
+                goto error;
+            }
+            reply = db_redis_command_argv(con, query_v);
+            db_redis_key_free(&query_v);
+            db_redis_check_reply(con, reply, error);
+            db_redis_free_reply(&reply);
+
+            if (db_redis_key_add_string(&query_v, "SCARD", 5) != 0) {
+                LM_ERR("Failed to add scard command to post-delete query\n");
+                goto error;
+            }
+            if (db_redis_key_add_str(&query_v, &type_key->key) != 0) {
+                LM_ERR("Failed to add key to delete query\n");
+                goto error;
+            }
+            reply = db_redis_command_argv(con, query_v);
+            db_redis_key_free(&query_v);
+            db_redis_check_reply(con, reply, error);
+            scard = reply->integer;
+            db_redis_free_reply(&reply);
+
+            if (scard != 0)
+                continue;
+            
+            if (db_redis_key_add_string(&query_v, "SREM", 4) != 0) {
+                LM_ERR("Failed to add srem command to post-delete query\n");
+                goto error;
+            }
+            if (db_redis_key_add_str(&query_v, &set_key->key) != 0) {
+                LM_ERR("Failed to add key to delete query\n");
+                goto error;
+            }
+            if (db_redis_key_add_str(&query_v, &type_key->key) != 0) {
+                LM_ERR("Failed to add key to delete query\n");
+                goto error;
+            }
+            reply = db_redis_command_argv(con, query_v);
+            db_redis_key_free(&query_v);
+            db_redis_check_reply(con, reply, error);
+#else
             if (db_redis_key_add_string(&query_v, "EVALSHA", 7) != 0) {
             if (db_redis_key_add_string(&query_v, "EVALSHA", 7) != 0) {
                 LM_ERR("Failed to add srem command to post-delete query\n");
                 LM_ERR("Failed to add srem command to post-delete query\n");
                 goto error;
                 goto error;
@@ -1834,6 +1908,7 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con
             db_redis_key_free(&query_v);
             db_redis_key_free(&query_v);
             db_redis_check_reply(con, reply, error);
             db_redis_check_reply(con, reply, error);
             db_redis_free_reply(&reply);
             db_redis_free_reply(&reply);
+#endif
         }
         }
         LM_DBG("done with loop '%.*s'\n", k->key.len, k->key.s);
         LM_DBG("done with loop '%.*s'\n", k->key.len, k->key.s);
         db_redis_key_free(&type_keys);
         db_redis_key_free(&type_keys);
@@ -1883,6 +1958,9 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con
     redis_key_t *new_type_keys = NULL;
     redis_key_t *new_type_keys = NULL;
     int new_type_keys_count = 0;
     int new_type_keys_count = 0;
     redis_key_t *all_type_key;
     redis_key_t *all_type_key;
+#ifdef WITH_HIREDIS_CLUSTER
+    long long scard;
+#endif
 
 
     if (!(*keys_count) && do_table_scan) {
     if (!(*keys_count) && do_table_scan) {
         LM_WARN("performing full table scan on table '%.*s' while performing update\n",
         LM_WARN("performing full table scan on table '%.*s' while performing update\n",
@@ -2194,6 +2272,58 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con
 
 
                 db_redis_key_free(&query_v);
                 db_redis_key_free(&query_v);
 
 
+#ifdef WITH_HIREDIS_CLUSTER
+                if (db_redis_key_add_string(&query_v, "SREM", 4) != 0) {
+                    LM_ERR("Failed to add srem command to post-delete query\n");
+                    goto error;
+                }
+                if (db_redis_key_add_str(&query_v, &type_key->key) != 0) {
+                    LM_ERR("Failed to add key to delete query\n");
+                    goto error;
+                }
+                if (db_redis_key_add_str(&query_v, key) != 0) {
+                    LM_ERR("Failed to add key to delete query\n");
+                    goto error;
+                }
+                reply = db_redis_command_argv(con, query_v);
+                db_redis_key_free(&query_v);
+                db_redis_check_reply(con, reply, error);
+                db_redis_free_reply(&reply);
+
+                if (db_redis_key_add_string(&query_v, "SCARD", 5) != 0) {
+                    LM_ERR("Failed to add scard command to post-delete query\n");
+                    goto error;
+                }
+                if (db_redis_key_add_str(&query_v, &type_key->key) != 0) {
+                    LM_ERR("Failed to add key to delete query\n");
+                    goto error;
+                }
+                reply = db_redis_command_argv(con, query_v);
+                db_redis_key_free(&query_v);
+                db_redis_check_reply(con, reply, error);
+                scard = reply->integer;
+                db_redis_free_reply(&reply);
+
+                if (scard != 0)
+                    continue;
+                
+                if (db_redis_key_add_string(&query_v, "SREM", 4) != 0) {
+                    LM_ERR("Failed to add srem command to post-delete query\n");
+                    goto error;
+                }
+                if (db_redis_key_add_str(&query_v, &set_key->key) != 0) {
+                    LM_ERR("Failed to add key to delete query\n");
+                    goto error;
+                }
+                if (db_redis_key_add_str(&query_v, &type_key->key) != 0) {
+                    LM_ERR("Failed to add key to delete query\n");
+                    goto error;
+                }
+                reply = db_redis_command_argv(con, query_v);
+                db_redis_key_free(&query_v);
+                db_redis_check_reply(con, reply, error);
+                update_queries++;
+#else
                 if (db_redis_key_add_string(&query_v, "EVAL", 4) != 0) {
                 if (db_redis_key_add_string(&query_v, "EVAL", 4) != 0) {
                     LM_ERR("Failed to add srem command to post-delete query\n");
                     LM_ERR("Failed to add srem command to post-delete query\n");
                     goto error;
                     goto error;
@@ -2226,6 +2356,7 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con
                 }
                 }
 
 
                 db_redis_key_free(&query_v);
                 db_redis_key_free(&query_v);
+#endif
             }
             }
         }
         }