소스 검색

ndb_redis: dynamic discovery of nodes

Giacomo Vacca 7 년 전
부모
커밋
5ea801500e
3개의 변경된 파일83개의 추가작업 그리고 2개의 파일을 삭제
  1. 30 0
      src/modules/ndb_redis/doc/ndb_redis_admin.xml
  2. 2 0
      src/modules/ndb_redis/ndb_redis_mod.c
  3. 51 2
      src/modules/ndb_redis/redis_client.c

+ 30 - 0
src/modules/ndb_redis/doc/ndb_redis_admin.xml

@@ -267,6 +267,36 @@ modparam("ndb_redis", "disable_time", 30)
 			<programlisting format="linespecific">
 ...
 modparam("ndb_redis", "flush_on_reconnect", 1)
+...
+			</programlisting>
+		</example>
+	</section>
+	<section id="ndb_redis.p.allow_dynamic_nodes">
+		<title><varname>allow_dynamic_nodes</varname> (integer)</title>
+		<para>
+			If set to 1, the module will connect to servers indicated in the "MOVED" reply,
+			even if they are not specified at startup.
+		</para>
+		<para>
+			When <quote>cluster</quote> is enabled the module supports redirections ("MOVED") replies.
+			Set <quote>allow_dynamic_nodes</quote> to 1 to avoid listing all the nodes at startup.
+			You can only list one node, e.g. by using a DNS name for the cluster instead of an IP address.
+			The module will add dynamically the new nodes as the MOVED replies are received.
+			Only works if <quote>cluster</quote> is set to 1.
+		</para>
+		<para>
+		<emphasis>
+			Default value is <quote>0</quote> (disabled).
+		</emphasis>
+		</para>
+		<example>
+			<title>Set <varname>allow_dynamic_nodes</varname> parameter</title>
+			<programlisting format="linespecific">
+...
+modparam("ndb_redis", "server", "name=redis_cluster;addr=127.0.0.1;port=26008")
+...
+modparam("ndb_redis", "cluster", 1)
+modparam("ndb_redis", "allow_dynamic_nodes", 1)
 ...
 			</programlisting>
 		</example>

+ 2 - 0
src/modules/ndb_redis/ndb_redis_mod.c

@@ -52,6 +52,7 @@ int redis_cluster_param = 0;
 int redis_disable_time_param=0;
 int redis_allowed_timeouts_param=-1;
 int redis_flush_on_reconnect_param=0;
+int redis_allow_dynamic_nodes_param = 0;
 
 static int w_redis_cmd3(struct sip_msg* msg, char* ssrv, char* scmd,
 		char* sres);
@@ -127,6 +128,7 @@ static param_export_t params[]={
 	{"disable_time", INT_PARAM, &redis_disable_time_param},
 	{"allowed_timeouts", INT_PARAM, &redis_allowed_timeouts_param},
 	{"flush_on_reconnect", INT_PARAM, &redis_flush_on_reconnect_param},
+	{"allow_dynamic_nodes", INT_PARAM, &redis_allow_dynamic_nodes_param},
 	{0, 0, 0}
 };
 

+ 51 - 2
src/modules/ndb_redis/redis_client.c

@@ -52,6 +52,7 @@ extern int redis_cluster_param;
 extern int redis_disable_time_param;
 extern int redis_allowed_timeouts_param;
 extern int redis_flush_on_reconnect_param;
+extern int redis_allow_dynamic_nodes_param;
 
 /* backwards compatibility with hiredis < 0.12 */
 #if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12)
@@ -651,6 +652,9 @@ int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv) {
 	char buffername[100];
 	unsigned int port;
 	str addr = {0, 0}, tmpstr = {0, 0}, name = {0, 0};
+	int server_len = 0;
+	char spec_new[100];
+
 	if (redis_cluster_param) {
 		LM_DBG("Redis replied: \"%.*s\"\n", reply->len, reply->str);
 		if ((reply->len > 7) && (strncmp(reply->str, "MOVED", 5) == 0)) {
@@ -683,9 +687,54 @@ int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv) {
 				LM_DBG("Reusing Connection\n");
 				*rsrv = rsrv_new;
 				return 1;
-			} else {
-				LM_ERR("No Connection with name (%.*s)\n", name.len, name.s);
 			}
+			/* New param redis_allow_dynamic_nodes_param:
+			if set, we allow ndb_redis to add nodes that were
+			not defined explicitly in the module configuration */
+			else if (redis_allow_dynamic_nodes_param) {
+                                /* For now the only way this can work is if
+				the new node is accessible with default
+				parameters for sock and db */
+                                memset(spec_new, 0, sizeof(spec_new));
+                                server_len = snprintf(spec_new, sizeof(spec_new) - 1, "name=%.*s;addr=%.*s;port=%i", name.len, name.s, addr.len, addr.s, port);
+
+                                char* server_new = (char*)pkg_malloc(server_len + 1);
+                                if (server_new == NULL) {
+                                        LM_ERR("Error allocating pkg mem\n");
+                                        pkg_free(server_new);
+                                        return 0;
+                                }
+
+                                strncpy(server_new, spec_new, server_len);
+                                server_new[server_len] = '\0';
+
+                                if (redisc_add_server(server_new) == 0) {
+                                        rsrv_new = redisc_get_server(&name);
+
+                                        if (rsrv_new) {
+                                                *rsrv = rsrv_new;
+						// Need to connect to the new server now
+                                                if(redisc_reconnect_server(rsrv_new)==0) {
+							LM_DBG("Connected to the new server with name: %.*s\n", name.len, name.s);
+							return 1;
+                                                }
+                                                else {
+							LM_ERR("ERROR connecting to the new server with name: %.*s\n", name.len, name.s);
+							return 0;
+                                                }
+                                        }
+                                        else {
+                                                /* Adding the new node failed
+                                                Cannot perform redirection */
+                                                LM_ERR("No new connection with name (%.*s) was created\n", name.len, name.s);
+                                        }
+                                }
+                                else {
+                                        LM_ERR("Could not add a new connection with name %.*s\n", name.len, name.s);
+                                }
+                        } else {
+                                LM_ERR("No Connection with name (%.*s)\n", name.len, name.s);
+                        }
 		}
 	}
 	return 0;