Kaynağa Gözat

Merge pull request #1115 from claudiupb/fix_redis_pipeline

Fix redis pipeline
Daniel-Constantin Mierla 8 yıl önce
ebeveyn
işleme
99a73cb42e

+ 100 - 16
src/modules/ndb_redis/redis_client.c

@@ -50,6 +50,15 @@ extern int redis_connect_timeout_param;
 extern int redis_cmd_timeout_param;
 extern int redis_cluster_param;
 
+/* backwards compatibility with hiredis < 0.12 */
+#if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12)
+typedef char *sds;
+sds sdscatlen(sds s, const void *t, size_t len);
+int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len);
+#else
+#define redis_append_formatted_command redisAppendFormattedCommand
+#endif
+
 /**
  *
  */
@@ -421,7 +430,7 @@ int redisc_append_cmd(str *srv, str *res, str *cmd, ...)
 		LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
 		goto error_cmd;
 	}
-	if (rsrv->pendingReplies >= MAXIMUM_PIPELINED_COMMANDS)
+	if (rsrv->piped.pending_commands >= MAXIMUM_PIPELINED_COMMANDS)
 	{
 		LM_ERR("Too many pipelined commands, maximum is %d\n",MAXIMUM_PIPELINED_COMMANDS);
 		goto error_cmd;
@@ -435,13 +444,17 @@ int redisc_append_cmd(str *srv, str *res, str *cmd, ...)
 
 	c = cmd->s[cmd->len];
 	cmd->s[cmd->len] = '\0';
-	if (redisvAppendCommand(rsrv->ctxRedis,cmd->s,ap) != REDIS_OK)
+	rsrv->piped.commands[rsrv->piped.pending_commands].len = redisvFormatCommand(
+			&rsrv->piped.commands[rsrv->piped.pending_commands].s,
+			cmd->s,
+			ap);
+	if (rsrv->piped.commands[rsrv->piped.pending_commands].len < 0)
 	{
 		LM_ERR("Invalid redis command : %s\n",cmd->s);
 		goto error_cmd;
 	}
-	rsrv->pipelinedReplies[rsrv->pendingReplies]=rpl;
-	rsrv->pendingReplies++;
+	rsrv->piped.replies[rsrv->piped.pending_commands]=rpl;
+	rsrv->piped.pending_commands++;
 
 	cmd->s[cmd->len] = c;
 	va_end(ap);
@@ -485,6 +498,48 @@ int redisc_exec_pipelined_cmd(str *srv)
 	return redisc_exec_pipelined(rsrv);
 }
 
+/**
+ *
+ */
+int redisc_create_pipelined_message(redisc_server_t *rsrv)
+{
+	int i;
+
+	if (rsrv->ctxRedis->err)
+	{
+		LM_DBG("Reconnecting server because of error %d: \"%s\"",rsrv->ctxRedis->err,rsrv->ctxRedis->errstr);
+		if (redisc_reconnect_server(rsrv))
+		{
+			LM_ERR("unable to reconnect to REDIS server: %.*s\n", rsrv->sname->len,rsrv->sname->s);
+			return -1;
+		}
+	}
+
+	for (i=0;i<rsrv->piped.pending_commands;i++)
+	{
+		if (redis_append_formatted_command(rsrv->ctxRedis,rsrv->piped.commands[i].s,rsrv->piped.commands[i].len) != REDIS_OK)
+		{
+			LM_ERR("Error while appending command %d",i);
+			return -1;
+		}
+	}
+	return 0;
+}
+
+/**
+ *
+ */
+void redisc_free_pipelined_cmds(redisc_server_t *rsrv)
+{
+	int i;
+	for (i=0;i<rsrv->piped.pending_commands;i++)
+	{
+		free(rsrv->piped.commands[i].s);
+		rsrv->piped.commands[i].len=0;
+	}
+	rsrv->piped.pending_commands=0;
+}
+
 /**
  *
  */
@@ -493,14 +548,19 @@ int redisc_exec_pipelined(redisc_server_t *rsrv)
 	redisc_reply_t *rpl;
 	int i;
 	LM_DBG("redis server: %.*s\n", rsrv->sname->len,rsrv->sname->s);
-	if (rsrv->pendingReplies == 0)
+	if (rsrv->piped.pending_commands == 0)
 	{
-		LM_ERR("call for redis_cmd without any pipelined commands\n");
+		LM_WARN("call for redis_cmd without any pipelined commands\n");
 		return -1;
 	}
+	if(rsrv->ctxRedis==NULL)
+	{
+		LM_ERR("no redis context for server: %.*s\n", rsrv->sname->len,rsrv->sname->s);
+		goto error_exec;
+	}
 
-	/* send the first command and wait for the replies */
-	rpl=rsrv->pipelinedReplies[0];
+	/* send the commands and retrieve the first reply */
+	rpl=rsrv->piped.replies[0];
 
 	if(rpl->rplRedis!=NULL)
 	{
@@ -509,7 +569,9 @@ int redisc_exec_pipelined(redisc_server_t *rsrv)
 		rpl->rplRedis = NULL;
 	}
 
+	redisc_create_pipelined_message(rsrv);
 	redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis);
+
 	if (rpl->rplRedis == NULL)
 	{
 		/* null reply, reconnect and try again */
@@ -517,7 +579,7 @@ int redisc_exec_pipelined(redisc_server_t *rsrv)
 		{
 			LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
 		}
-		if (redisc_reconnect_server(rsrv) == 0)
+		if (redisc_create_pipelined_message(rsrv) == 0)
 		{
 			redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis);
 			if (rpl->rplRedis == NULL)
@@ -528,28 +590,33 @@ int redisc_exec_pipelined(redisc_server_t *rsrv)
 		}
 		else
 		{
-			LM_ERR("unable to reconnect to redis server: %.*s\n", rsrv->sname->len,rsrv->sname->s);
 			goto error_exec;
 		}
 	}
 	LM_DBG("reply is [%s]",rpl->rplRedis->str);
 
 	/* replies are received just retrieve them */
-	for (i=1;i<rsrv->pendingReplies;i++)
+	for (i=1;i<rsrv->piped.pending_commands;i++)
 	{
-		rpl=rsrv->pipelinedReplies[i];
+		rpl=rsrv->piped.replies[i];
+		if(rpl->rplRedis!=NULL)
+		{
+			/* clean up previous redis reply */
+			freeReplyObject(rpl->rplRedis);
+			rpl->rplRedis = NULL;
+		}
 		if (redisGetReplyFromReader(rsrv->ctxRedis, (void**) &rpl->rplRedis) != REDIS_OK)
 		{
 			LM_ERR("Unable to read reply\n");
-			goto error_exec;
+			continue;
 		}
 		LM_DBG("reply is [%s]",rpl->rplRedis->str);
 	}
-	rsrv->pendingReplies = 0;
+	redisc_free_pipelined_cmds(rsrv);
 	return 0;
 
 error_exec:
-	rsrv->pendingReplies = 0;
+	redisc_free_pipelined_cmds(rsrv);
 	return -1;
 }
 
@@ -639,7 +706,7 @@ int redisc_exec(str *srv, str *res, str *cmd, ...)
 	}
 	LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
   
-	if (rsrv->pendingReplies != 0)
+	if (rsrv->piped.pending_commands != 0)
 	{
 		LM_NOTICE("Calling redis_cmd with pipelined commands in the buffer. Automatically call redis_execute");
 		redisc_exec_pipelined(rsrv);
@@ -899,3 +966,20 @@ int redisc_check_auth(redisc_server_t *rsrv, char *pass)
 	freeReplyObject(reply);
 	return retval;
 }
+
+/* backwards compatibility with hiredis < 0.12 */
+#if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12)
+int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len)
+{
+	sds newbuf;
+
+	newbuf = sdscatlen(c->obuf,cmd,len);
+	if (newbuf == NULL) {
+		c->err = REDIS_ERR_OOM;
+		strcpy(c->errstr,"Out of memory");
+		return REDIS_ERR;
+	}
+	c->obuf = newbuf;
+	return REDIS_OK;
+}
+#endif

+ 9 - 2
src/modules/ndb_redis/redis_client.h

@@ -47,14 +47,19 @@ typedef struct redisc_reply {
 	struct redisc_reply *next;
 } redisc_reply_t;
 
+typedef struct redisc_piped_cmds {
+	str commands[MAXIMUM_PIPELINED_COMMANDS];
+	redisc_reply_t *replies[MAXIMUM_PIPELINED_COMMANDS];
+	int pending_commands;
+} redisc_piped_cmds_t;
+
 typedef struct redisc_server {
 	str *sname;
 	unsigned int hname;
 	param_t *attrs;
 	redisContext *ctxRedis;
 	struct redisc_server *next;
-	redisc_reply_t *pipelinedReplies[MAXIMUM_PIPELINED_COMMANDS];
-	int pendingReplies;
+	redisc_piped_cmds_t piped;
 } redisc_server_t;
 
 typedef struct redisc_pv {
@@ -74,6 +79,8 @@ int redisc_exec(str *srv, str *res, str *cmd, ...);
 int redisc_append_cmd(str *srv, str *res, str *cmd, ...);
 int redisc_exec_pipelined_cmd(str *srv);
 int redisc_exec_pipelined(redisc_server_t *rsrv);
+int redisc_create_pipelined_message(redisc_server_t *rsrv);
+void redisc_free_pipelined_cmds(redisc_server_t *rsrv);
 redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv,
 		const size_t *argvlen);
 redisc_reply_t *redisc_get_reply(str *name);