Parcourir la source

ndb_redis: enhance access to REDIS replies
- add support to access array within arrays

Claudiu Boriga il y a 8 ans
Parent
commit
f4a793ee33

+ 16 - 0
src/modules/ndb_redis/doc/ndb_redis_admin.xml

@@ -319,6 +319,12 @@ modparam("ndb_redis", "flush_on_reconnect", 1)
 			</listitem>
 			</itemizedlist>
 		</para>
+		<para>
+			In case one of the members of the array is also an array (for example calling SMEMBERS
+			in a MULTI/EXEC transaction), the members of the array can be accessed by adding any of
+			the above keys, after a value[n] key. The first value[n] references the element in the
+			first array, while the next key references an element of the referenced array.
+		</para>
 		<example>
 		<title><function>redis_cmd</function> usage</title>
 		<programlisting format="linespecific">
@@ -351,6 +357,16 @@ if(redis_cmd("srvN", "HMGET foo_key field1 field3", "r")) {
     xlog("array size: $redis(r=>size)\n");
     xlog("first values: $redis(r=>value[0]) , $redis(r=>value[1])\n");
 }
+
+
+# array as element of an array example
+redis_cmd("srvN", "MULTI", "r1");
+redis_cmd("srvN", "SMEMBERS foo", "r2");
+if (redis_cmd("srvN", "EXEC", "r")) {
+    xlog("array size: $redis(r=>value[0]=>size)\n");
+    xlog("first member of the set:$redis(r=>value[0]=>value[0])\n");
+    xlog("type of the second member of the set:$redis(r=>value[0]=>type[1])\n");
+}
 ...
 </programlisting>
 		</example>

+ 109 - 72
src/modules/ndb_redis/ndb_redis_mod.c

@@ -644,36 +644,36 @@ int redis_parse_index(str *in, gparam_t *gp)
 /**
  *
  */
-int redis_parse_token(str *in, gparam_t *gp, int i)
+int redis_parse_token(str *in, gparam_t *gp, int *i)
 {
 	str tok;
 
-	while(i<in->len && isspace(in->s[i]))
-		i++;
+	while(*i<in->len && isspace(in->s[*i]))
+		(*i)++;
 
-	if(i>=in->len-2)
+	if(*i>=in->len-2)
 		return -1;
 
-	if(in->s[i++]!='[')
+	if(in->s[(*i)++]!='[')
 		return -1;
 
-	while(i<in->len-1 && isspace(in->s[i]))
-		i++;
-	if(i==in->len-1 || in->s[i]==']')
+	while(*i<in->len-1 && isspace(in->s[*i]))
+		(*i)++;
+	if(*i==in->len-1 || in->s[*i]==']')
 		return -1;
-	tok.s = &(in->s[i]);
+	tok.s = &(in->s[*i]);
 
-	while(i<in->len && !isspace(in->s[i]) && in->s[i]!=']')
-		i++;
-	if(i==in->len)
+	while(*i<in->len && !isspace(in->s[*i]) && in->s[*i]!=']')
+		(*i)++;
+	if(*i==in->len)
 		return -1;
-	tok.len = &(in->s[i]) - tok.s;
+	tok.len = &(in->s[*i]) - tok.s;
 	if(redis_parse_index(&tok, gp)!=0)
 		return -1;
 
-	while(i<in->len && isspace(in->s[i]))
-		i++;
-	if(i==in->len || in->s[i]!=']')
+	while(*i<in->len && isspace(in->s[*i]))
+			(*i)++;
+	if(*i==in->len || in->s[*i]!=']')
 		return -1;
 
 	return 0;
@@ -687,6 +687,8 @@ static int pv_parse_redisc_name(pv_spec_p sp, str *in)
 	redisc_pv_t *rpv=NULL;
 	str pvs;
 	int i;
+	int key_idx;
+	int last_key;
 
 	if(in->s==NULL || in->len<=0)
 		return -1;
@@ -716,50 +718,67 @@ static int pv_parse_redisc_name(pv_spec_p sp, str *in)
 	while(i<pvs.len-2 && isspace(pvs.s[i]))
 		i++;
 
-	if(pvs.s[i]!='=')
-		goto error_var;
+	key_idx=0;
+	do
+	{
 
-	if(pvs.s[i+1]!='>')
-		goto error_var;
+		if (pvs.s[i] != '=')
+			goto error_var;
 
-	i += 2;
-	while(i<pvs.len && isspace(pvs.s[i]))
-		i++;
+		if (pvs.s[i + 1] != '>')
+			goto error_var;
 
-	if(i>=pvs.len)
-		goto error_key;
+		i += 2;
+		while (i < pvs.len && isspace(pvs.s[i]))
+			i++;
 
-	rpv->rkey.s   = pvs.s + i;
-	rpv->rkey.len = pvs.len - i;
+		if (i >= pvs.len)
+			goto error_key;
 
-	/* Default pos param initialization. */
-	rpv->pos.type = GPARAM_TYPE_INT;
-	rpv->pos.v.i = -1;
+		rpv->rkey.s = pvs.s + i;
+		rpv->rkey.len = pvs.len - i;
 
-	if(rpv->rkey.len>=5 && strncmp(rpv->rkey.s, "value", 5)==0) {
-		rpv->rkeyid = 1;
-		if(rpv->rkey.len>5)
-		{
-			i+=5;
-			if(redis_parse_token(&pvs, &(rpv->pos), i)!=0)
-				goto error_key;
+		/* Default pos param initialization. */
+		rpv->pos[key_idx].type = GPARAM_TYPE_INT;
+		rpv->pos[key_idx].v.i = -1;
+
+		last_key=1;
+		if(rpv->rkey.len>=5 && strncmp(rpv->rkey.s, "value", 5)==0) {
+			rpv->rkeyid = 1;
+			if(rpv->rkey.len>5)
+			{
+				i+=5;
+				if(redis_parse_token(&pvs, &(rpv->pos[key_idx]), &i)!=0)
+					goto error_key;
+				i++;
+				if(i!=pvs.len)
+					last_key = 0;
+			}
+		} else if(rpv->rkey.len>=4 && strncmp(rpv->rkey.s, "type", 4)==0) {
+			rpv->rkeyid = 0;
+			if (rpv->rkey.len>4)
+			{
+				i+=4;
+				if(redis_parse_token(&pvs, &(rpv->pos[key_idx]), &i)!=0)
+					goto error_key;
+			}
+		} else if (rpv->rkey.len==4 && strncmp(rpv->rkey.s, "info", 4)==0) {
+			rpv->rkeyid = 2;
+		} else if (rpv->rkey.len==4 && strncmp(rpv->rkey.s, "size", 4)==0) {
+			rpv->rkeyid = 3;
+		} else {
+			goto error_key;
 		}
-	} else if(rpv->rkey.len>=4 && strncmp(rpv->rkey.s, "type", 4)==0) {
-		rpv->rkeyid = 0;
-		if(rpv->rkey.len>4)
-		{
-			i+=4;
-			if(redis_parse_token(&pvs, &(rpv->pos), i)!=0)
-				goto error_key;
+
+		key_idx++;
+		if (key_idx > MAXIMUM_NESTED_KEYS) {
+			LM_ERR("Too many nested fields");
+			goto error_key;
 		}
-	} else if(rpv->rkey.len==4 && strncmp(rpv->rkey.s, "info", 4)==0) {
-		rpv->rkeyid = 2;
-	} else if(rpv->rkey.len==4 && strncmp(rpv->rkey.s, "size", 4)==0) {
-		rpv->rkeyid = 3;
-	} else {
-		goto error_key;
-	}
 
+	} while (!last_key);
+
+	rpv->rkeynum=key_idx;
 	sp->pvp.pvn.u.dname = (void*)rpv;
 	sp->pvp.pvn.type = PV_NAME_OTHER;
 	return 0;
@@ -784,6 +803,8 @@ static int pv_get_redisc(struct sip_msg *msg,  pv_param_t *param,
 	redisc_pv_t *rpv;
 	str s;
 	int pos;
+	int key_idx=0;
+	redisReply *reply;
 
 	rpv = (redisc_pv_t*)param->pvn.u.dname;
 	if(rpv->reply==NULL)
@@ -796,38 +817,54 @@ static int pv_get_redisc(struct sip_msg *msg,  pv_param_t *param,
 	if(rpv->reply->rplRedis==NULL)
 		return pv_get_null(msg, param, res);
 
+	reply=rpv->reply->rplRedis;
+	/* if we have arrays within arrays go to the element before last */
+	if (rpv->rkeynum>1)
+		for (key_idx=0; key_idx < rpv->rkeynum-1; key_idx++)
+		{
+			if(fixup_get_ivalue(msg, &rpv->pos[key_idx], &pos)!=0)
+				return pv_get_null(msg, param, res);
+			if(pos<0 || pos>=(int)reply->elements)
+				return pv_get_null(msg, param, res);
+			reply=reply->element[pos];
+			if (reply == NULL)
+			{
+				LM_ERR("The reply is corrupted");
+				return pv_get_null(msg, param, res);
+			}
+		}
 
-	if(fixup_get_ivalue(msg, &rpv->pos, &pos)!=0)
+	if(fixup_get_ivalue(msg, &rpv->pos[key_idx], &pos)!=0)
 		return pv_get_null(msg, param, res);
 
 	switch(rpv->rkeyid) {
 		case 1:
 			/* value */
-			switch(rpv->reply->rplRedis->type) {
+			switch(reply->type) {
 				case REDIS_REPLY_STRING:
 					if(pos!=-1)
 						return pv_get_null(msg, param, res);
-					s.len = rpv->reply->rplRedis->len;
-					s.s = rpv->reply->rplRedis->str;
+					s.len = reply->len;
+					s.s = reply->str;
 					return pv_get_strval(msg, param, res, &s);
 				case REDIS_REPLY_INTEGER:
 					if(pos!=-1)
 						return pv_get_null(msg, param, res);
 					return pv_get_sintval(msg, param, res,
-									(int)rpv->reply->rplRedis->integer);
+									(int)reply->integer);
 				case REDIS_REPLY_ARRAY:
-					if(pos<0 || pos>=(int)rpv->reply->rplRedis->elements)
+					if(pos<0 || pos>=(int)reply->elements)
 						return pv_get_null(msg, param, res);
-					if(rpv->reply->rplRedis->element[pos]==NULL)
+					if(reply->element[pos]==NULL)
 						return pv_get_null(msg, param, res);
-					switch(rpv->reply->rplRedis->element[pos]->type) {
+					switch(reply->element[pos]->type) {
 						case REDIS_REPLY_STRING:
-						s.len = rpv->reply->rplRedis->element[pos]->len;
-							s.s = rpv->reply->rplRedis->element[pos]->str;
+						s.len = reply->element[pos]->len;
+							s.s = reply->element[pos]->str;
 							return pv_get_strval(msg, param, res, &s);
 						case REDIS_REPLY_INTEGER:
 							return pv_get_sintval(msg, param, res,
-								(int)rpv->reply->rplRedis->element[pos]->integer);
+								(int)reply->element[pos]->integer);
 						default:
 							return pv_get_null(msg, param, res);
 					}
@@ -836,16 +873,16 @@ static int pv_get_redisc(struct sip_msg *msg,  pv_param_t *param,
 			}
 		case 2:
 			/* info */
-			if(rpv->reply->rplRedis->str==NULL)
+			if(reply->str==NULL)
 				return pv_get_null(msg, param, res);
-			s.len = rpv->reply->rplRedis->len;
-			s.s = rpv->reply->rplRedis->str;
+			s.len = reply->len;
+			s.s = reply->str;
 			return pv_get_strval(msg, param, res, &s);
 		case 3:
 			/* size */
-			if(rpv->reply->rplRedis->type == REDIS_REPLY_ARRAY) {
+			if(reply->type == REDIS_REPLY_ARRAY) {
 				return pv_get_uintval(msg, param, res,
-						(unsigned int)rpv->reply->rplRedis->elements);
+						(unsigned int)reply->elements);
 			} else {
 				return pv_get_null(msg, param, res);
 			}
@@ -853,15 +890,15 @@ static int pv_get_redisc(struct sip_msg *msg,  pv_param_t *param,
 			/* type */
 			if(pos==-1)
 				return pv_get_sintval(msg, param, res,
-								rpv->reply->rplRedis->type);
-			if(rpv->reply->rplRedis->type != REDIS_REPLY_ARRAY)
+								reply->type);
+			if(reply->type != REDIS_REPLY_ARRAY)
 				return pv_get_null(msg, param, res);
-			if(pos<0 || pos>=(int)rpv->reply->rplRedis->elements)
+			if(pos<0 || pos>=(int)reply->elements)
 				return pv_get_null(msg, param, res);
-			if(rpv->reply->rplRedis->element[pos]==NULL)
+			if(reply->element[pos]==NULL)
 				return pv_get_null(msg, param, res);
 			return pv_get_sintval(msg, param, res,
-					rpv->reply->rplRedis->element[pos]->type);
+					reply->element[pos]->type);
 		default:
 			/* We do nothing. */
 			return pv_get_null(msg, param, res);

+ 3 - 1
src/modules/ndb_redis/redis_client.h

@@ -35,6 +35,7 @@
 #include "../../core/mod_fix.h"
 
 #define MAXIMUM_PIPELINED_COMMANDS 1000
+#define MAXIMUM_NESTED_KEYS 10
 
 int redisc_init(void);
 int redisc_destroy(void);
@@ -74,7 +75,8 @@ typedef struct redisc_pv {
 	redisc_reply_t *reply;
 	str rkey;
 	int rkeyid;
-	gparam_t pos;  /* Array element position. */
+	gparam_t pos[MAXIMUM_NESTED_KEYS];  /* Array element position. */
+	int rkeynum;
 } redisc_pv_t;
 
 /* Server related functions */