Browse Source

ndb_redis: add disable server on failure feature
- if a server fails multiple consecutive times
it is disabled temporarily and commands to it
will not do anything.

Claudiu Boriga 8 years ago
parent
commit
d0101010b8

+ 58 - 0
src/modules/ndb_redis/doc/ndb_redis_admin.xml

@@ -177,6 +177,64 @@ modparam("ndb_redis", "server", "name=127.0.0.1:26004;addr=127.0.0.1;port=26004"
 modparam("ndb_redis", "server", "name=127.0.0.1:26008;addr=127.0.0.1;port=26008")
 ...
 modparam("ndb_redis", "cluster", 1)
+...
+			</programlisting>
+		</example>
+	</section>
+	<section id="ndb_redis.p.allowed_timeouts">
+		<title><varname>allowed_timeouts</varname> (integer)</title>
+		<para>
+			If this is set to a non-negative value, it sets the number
+			of consecutive REDIS commands that can fail before temporarily
+			disabling the REDIS server. This is similar to rtpengine_disable_tout
+			parameter from the rtpengine module.
+		</para>
+		<para>
+			When communicating with a REDIS server, if redis_cmd or redis_execute
+			will fail for more than <quote>allowed_timeouts</quote> consecutive
+			times, the server will be temporary disabled for a number of seconds 
+			configured by the <quote>disable_time</quote> parameter.
+		</para>
+		<para>
+			Disabling a server means that further redis_cmd and redis_execute commands
+			will not do anything and return a negative value <quote>-2</quote>. 
+			Messages are also logged when disabling and re-enabling a server.
+		</para>
+		<para>
+			The number of consecutive fails are counted by each Kamailio process, 
+			so when disabling a server this is done just for that process, not globally.
+		</para>
+		<para>
+		<emphasis>
+			Default value is <quote>-1</quote> (disabled).
+		</emphasis>
+		</para>
+		<example>
+			<title>Set <varname>allowed_timeots</varname> parameter</title>
+			<programlisting format="linespecific">
+...
+modparam("ndb_redis", "allowed_timeouts", 3)
+...
+			</programlisting>
+		</example>
+	</section>
+	<section id="ndb_redis.p.disable_time">
+		<title><varname>disable_time</varname> (integer)</title>
+		<para>
+			If allowed_timeouts is set to a non negative value this determines the 
+			number of seconds the REDIS server will be disabled
+		</para>
+		<para>
+		<emphasis>
+			Default value is <quote>0</quote>.
+		</emphasis>
+		</para>
+		<example>
+			<title>Set <varname>disable_time</varname> parameter</title>
+			<programlisting format="linespecific">
+...
+modparam("ndb_redis", "allowed_timeouts", 0)
+modparam("ndb_redis", "disable_time", 30)
 ...
 			</programlisting>
 		</example>

+ 4 - 0
src/modules/ndb_redis/ndb_redis_mod.c

@@ -48,6 +48,8 @@ int init_without_redis = 0;
 int redis_connect_timeout_param = 1000;
 int redis_cmd_timeout_param = 1000;
 int redis_cluster_param = 0;
+int disable_time=0;
+int allowed_timeouts=-1;
 
 static int w_redis_cmd3(struct sip_msg* msg, char* ssrv, char* scmd,
 		char* sres);
@@ -120,6 +122,8 @@ static param_export_t params[]={
 	{"connect_timeout", INT_PARAM, &redis_connect_timeout_param},
 	{"cmd_timeout", INT_PARAM, &redis_cmd_timeout_param},
 	{"cluster", INT_PARAM, &redis_cluster_param},
+	{"disable_time", INT_PARAM, &disable_time},
+	{"allowed_timeouts", INT_PARAM, &allowed_timeouts},
 	{0, 0, 0}
 };
 

+ 76 - 1
src/modules/ndb_redis/redis_client.c

@@ -49,6 +49,8 @@ extern int init_without_redis;
 extern int redis_connect_timeout_param;
 extern int redis_cmd_timeout_param;
 extern int redis_cluster_param;
+extern int disable_time;
+extern int allowed_timeouts;
 
 /* backwards compatibility with hiredis < 0.12 */
 #if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12)
@@ -547,7 +549,15 @@ int redisc_exec_pipelined(redisc_server_t *rsrv)
 {
 	redisc_reply_t *rpl;
 	int i;
+
 	LM_DBG("redis server: %.*s\n", rsrv->sname->len,rsrv->sname->s);
+
+	/* if server is disabled do nothing unless the disable time has passed */
+	if (redis_check_server(rsrv))
+	{
+		goto srv_disabled;
+	}
+
 	if (rsrv->piped.pending_commands == 0)
 	{
 		LM_WARN("call for redis_cmd without any pipelined commands\n");
@@ -584,12 +594,14 @@ int redisc_exec_pipelined(redisc_server_t *rsrv)
 			redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis);
 			if (rpl->rplRedis == NULL)
 			{
+				redis_count_err_and_disable(rsrv);
 				LM_ERR("Unable to read reply\n");
 				goto error_exec;
 			}
 		}
 		else
 		{
+			redis_count_err_and_disable(rsrv);
 			goto error_exec;
 		}
 	}
@@ -613,11 +625,16 @@ int redisc_exec_pipelined(redisc_server_t *rsrv)
 		LM_DBG("reply is [%s]",rpl->rplRedis->str);
 	}
 	redisc_free_pipelined_cmds(rsrv);
+	rsrv->disable.consecutive_errors = 0;
 	return 0;
 
 error_exec:
 	redisc_free_pipelined_cmds(rsrv);
 	return -1;
+
+srv_disabled:
+	redisc_free_pipelined_cmds(rsrv);
+	return -2;
 }
 
 int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv) {
@@ -711,6 +728,11 @@ int redisc_exec(str *srv, str *res, str *cmd, ...)
 		LM_NOTICE("Calling redis_cmd with pipelined commands in the buffer. Automatically call redis_execute");
 		redisc_exec_pipelined(rsrv);
 	}
+	/* if server is disabled do nothing unless the disable time has passed */
+	if (redis_check_server(rsrv))
+	{
+		goto srv_disabled;
+	}
   
 	rpl = redisc_get_reply(res);
 	if(rpl==NULL)
@@ -738,7 +760,15 @@ int redisc_exec(str *srv, str *res, str *cmd, ...)
 		if(redisc_reconnect_server(rsrv)==0)
 		{
 			rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap2);
-		} else {
+			if (rpl->rplRedis ==NULL)
+			{
+				redis_count_err_and_disable(rsrv);
+				goto error_exec;
+			}
+		}
+		else
+		{
+			redis_count_err_and_disable(rsrv);
 			LM_ERR("unable to reconnect to redis server: %.*s\n",
 					srv->len, srv->s);
 			cmd->s[cmd->len] = c;
@@ -781,6 +811,7 @@ int redisc_exec(str *srv, str *res, str *cmd, ...)
 		}
 	}
 	cmd->s[cmd->len] = c;
+	rsrv->disable.consecutive_errors = 0;
 	va_end(ap);
 	va_end(ap2);
 	va_end(ap3);
@@ -797,6 +828,13 @@ error_exec:
 	va_end(ap4);
 	return -1;
 
+srv_disabled:
+	va_end(ap);
+	va_end(ap2);
+	va_end(ap3);
+	va_end(ap4);
+	return -2;
+
 }
 
 /**
@@ -983,3 +1021,40 @@ int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len)
 	return REDIS_OK;
 }
 #endif
+
+int redis_check_server(redisc_server_t *rsrv)
+{
+
+	if (rsrv->disable.disabled)
+	{
+		if (get_ticks() > rsrv->disable.restore_tick)
+		{
+			LM_INFO("REDIS server %.*s re-enabled",rsrv->sname->len,rsrv->sname->s);
+			rsrv->disable.disabled = 0;
+			rsrv->disable.consecutive_errors = 0;
+		}
+		else
+		{
+			return 1;
+		}
+	}
+	return 0;
+}
+
+int redis_count_err_and_disable(redisc_server_t *rsrv)
+{
+	if (allowed_timeouts < 0)
+	{
+		return 0;
+	}
+
+	rsrv->disable.consecutive_errors++;
+	if (rsrv->disable.consecutive_errors > allowed_timeouts)
+	{
+		rsrv->disable.disabled=1;
+		rsrv->disable.restore_tick=get_ticks() + disable_time;
+		LM_WARN("REDIS server %.*s disabled for %d seconds",rsrv->sname->len,rsrv->sname->s,disable_time);
+		return 1;
+	}
+	return 0;
+}

+ 9 - 0
src/modules/ndb_redis/redis_client.h

@@ -53,6 +53,12 @@ typedef struct redisc_piped_cmds {
 	int pending_commands;
 } redisc_piped_cmds_t;
 
+typedef struct redisc_srv_disable {
+	int disabled;
+	int consecutive_errors;
+	time_t restore_tick;
+} redisc_srv_disable_t;
+
 typedef struct redisc_server {
 	str *sname;
 	unsigned int hname;
@@ -60,6 +66,7 @@ typedef struct redisc_server {
 	redisContext *ctxRedis;
 	struct redisc_server *next;
 	redisc_piped_cmds_t piped;
+	redisc_srv_disable_t disable;
 } redisc_server_t;
 
 typedef struct redisc_pv {
@@ -86,4 +93,6 @@ redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv,
 redisc_reply_t *redisc_get_reply(str *name);
 int redisc_free_reply(str *name);
 int redisc_check_auth(redisc_server_t *rsrv, char *pass);
+int redis_check_server(redisc_server_t *rsrv);
+int redis_count_err_and_disable(redisc_server_t *rsrv);
 #endif