소스 검색

dispatcher: new function ds_select_routing(rules, mode, [limit])

- select target addresses from a combination of groups and algorithms,
control where the first destination is pushed and optionally set a limit
Daniel-Constantin Mierla 7 년 전
부모
커밋
3933a64c46
3개의 변경된 파일285개의 추가작업 그리고 65개의 파일을 삭제
  1. 93 63
      src/modules/dispatcher/dispatch.c
  2. 15 2
      src/modules/dispatcher/dispatch.h
  3. 177 0
      src/modules/dispatcher/dispatcher.c

+ 93 - 63
src/modules/dispatcher/dispatch.c

@@ -1935,19 +1935,60 @@ int ds_select_dst(struct sip_msg *msg, int set, int alg, int mode)
 
 /**
  * Set destination address from group 'set' selected with alogorithm 'alg'
- * - the rest of addresses in group are added as next destination in avps,
+ * - the rest of addresses in group are added as next destination in xavps,
  *   up to the 'limit'
  * - mode specify to set address in R-URI or outboud proxy
  *
  */
-int ds_select_dst_limit(
-		sip_msg_t *msg, int set, int alg, unsigned int limit, int mode)
+int ds_select_dst_limit(sip_msg_t *msg, int set, int alg, uint32_t limit,
+		int mode)
 {
-	int i, cnt;
+	int ret;
+	sr_xval_t nxval;
+	ds_select_state_t vstate;
+
+	memset(&vstate, 0, sizeof(ds_select_state_t));
+	vstate.setid = set;
+	vstate.alg = alg;
+	vstate.umode = mode;
+	vstate.limit = limit;
+
+	if(vstate.limit == 0) {
+		LM_DBG("Limit set to 0 - forcing to unlimited\n");
+		vstate.limit = 0xffffffff;
+	}
+
+	ret = ds_manage_routes(msg, &vstate);
+	if(ret<0) {
+		return ret;
+	}
+
+	/* add cnt value to xavp */
+	if(((ds_xavp_ctx_mode & DS_XAVP_CTX_SKIP_CNT)==0)
+			&& (ds_xavp_ctx.len >= 0)) {
+		/* add to xavp the number of selected dst records */
+		memset(&nxval, 0, sizeof(sr_xval_t));
+		nxval.type = SR_XTYPE_INT;
+		nxval.v.i = vstate.cnt;
+		if(xavp_add_xavp_value(&ds_xavp_ctx, &ds_xavp_ctx_cnt, &nxval, NULL)==NULL) {
+			LM_ERR("failed to add cnt value to xavp\n");
+			return -1;
+		}
+	}
+
+	LM_DBG("selected target destinations: %d\n", vstate.cnt);
+
+	return ret;
+}
+
+/**
+ *
+ */
+int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate)
+{
+	int i;
 	unsigned int hash;
 	ds_set_t *idx = NULL;
-	sr_xval_t nxval;
-	sr_xavp_t *pxavp = NULL;
 
 	if(msg == NULL) {
 		LM_ERR("bad parameters\n");
@@ -1959,13 +2000,7 @@ int ds_select_dst_limit(
 		return -1;
 	}
 
-	if(limit == 0) {
-		LM_DBG("Limit set to 0 - forcing to unlimited\n");
-		limit = 0xffffffff;
-	}
-	--limit; /* reserving 1 slot for selected dst */
-
-	if((mode == DS_SETOP_DSTURI) && (ds_force_dst == 0)
+	if((rstate->umode == DS_SETOP_DSTURI) && (ds_force_dst == 0)
 			&& (msg->dst_uri.s != NULL || msg->dst_uri.len > 0)) {
 		LM_ERR("destination already set [%.*s]\n", msg->dst_uri.len,
 				msg->dst_uri.s);
@@ -1974,15 +2009,15 @@ int ds_select_dst_limit(
 
 
 	/* get the index of the set */
-	if(ds_get_index(set, *crt_idx, &idx) != 0) {
-		LM_ERR("destination set [%d] not found\n", set);
+	if(ds_get_index(rstate->setid, *crt_idx, &idx) != 0) {
+		LM_ERR("destination set [%d] not found\n", rstate->setid);
 		return -1;
 	}
 
-	LM_DBG("set [%d]\n", set);
+	LM_DBG("set [%d]\n", rstate->setid);
 
 	hash = 0;
-	switch(alg) {
+	switch(rstate->alg) {
 		case DS_ALG_HASHCALLID: /* 0 - hash call-id */
 			if(ds_hash_callid(msg, &hash) != 0) {
 				LM_ERR("can't get callid hash\n");
@@ -2048,14 +2083,14 @@ int ds_select_dst_limit(
 			if(msg->first_line.u.request.method_value != METHOD_INVITE) {
 				/* use first entry */
 				hash = 0;
-				alg = 0;
+				rstate->alg = 0;
 				break;
 			}
 			if(ds_xavp_dst.len <= 0) {
 				LM_ERR("no dst xavp for load distribution"
 					   " - using first entry...\n");
 				hash = 0;
-				alg = 0;
+				rstate->alg = 0;
 			} else {
 				i = ds_get_leastloaded(idx);
 				if(i < 0) {
@@ -2063,10 +2098,10 @@ int ds_select_dst_limit(
 					return -1;
 				}
 				hash = i;
-				if(ds_load_add(msg, idx, set, hash) < 0) {
+				if(ds_load_add(msg, idx, rstate->setid, hash) < 0) {
 					LM_ERR("unable to update destination load"
 						   " - classic dispatching\n");
-					alg = 0;
+					rstate->alg = 0;
 				}
 			}
 			break;
@@ -2078,12 +2113,12 @@ int ds_select_dst_limit(
 			hash = 0;
 			break;
 		default:
-			LM_WARN("algo %d not implemented - using first entry...\n", alg);
+			LM_WARN("algo %d not implemented - using first entry...\n",
+					rstate->alg);
 			hash = 0;
 	}
 
-	LM_DBG("alg hash [%u]\n", hash);
-	cnt = 0;
+	LM_DBG("using alg [%d] hash [%u]\n", rstate->alg, hash);
 
 	if(ds_use_default != 0 && idx->nr != 1)
 		hash = hash % (idx->nr - 1);
@@ -2112,23 +2147,21 @@ int ds_select_dst_limit(
 
 	hash = i;
 
-	if(mode!=DS_SETOP_XAVP) {
+	if(rstate->umode!=DS_SETOP_XAVP) {
 		if(ds_push_dst(msg, &idx->dlist[hash].uri, idx->dlist[hash].sock,
-					mode) != 0) {
+					rstate->umode) != 0) {
 			LM_ERR("cannot set next hop address with: %.*s\n",
 					idx->dlist[hash].uri.len, idx->dlist[hash].uri.s);
 			return -1;
 		}
 	}
-	/* if alg is round-robin then update the shortcut to next to be used */
-	if(alg == DS_ALG_ROUNDROBIN)
-		idx->last = (hash + 1) % idx->nr;
 
-	LM_DBG("selected [%d-%d/%d] <%.*s>\n", alg, set, hash,
+	LM_DBG("selected [%d-%d-%d/%d] <%.*s>\n", rstate->alg, rstate->setid,
+			rstate->umode, hash,
 			idx->dlist[hash].uri.len, idx->dlist[hash].uri.s);
 
-	if(alg == DS_ALG_PARALLEL) {
-		if(ds_add_branches(msg, idx, hash, mode)<0) {
+	if(rstate->alg == DS_ALG_PARALLEL) {
+		if(ds_add_branches(msg, idx, hash, rstate->umode)<0) {
 			LM_ERR("failed to add additional branches\n");
 			/* one destination was already set - return success anyhow */
 			return 2;
@@ -2144,70 +2177,67 @@ int ds_select_dst_limit(
 		return 1;
 	}
 
-	LM_DBG("using first entry [%d/%d]\n", set, hash);
-	if(ds_add_xavp_record(idx, hash, set, alg, &pxavp)<0) {
-		LM_ERR("failed to add destination in the xavp (%d/%d)\n", hash, set);
+	LM_DBG("using first entry [%d/%d]\n", rstate->setid, hash);
+	if(ds_add_xavp_record(idx, hash, rstate->setid, rstate->alg,
+				&rstate->lxavp)<0) {
+		LM_ERR("failed to add destination in the xavp (%d/%d)\n",
+				hash, rstate->setid);
 		return -1;
 	}
+	rstate->cnt++;
 
 	/* add to xavp the destinations after the selected one */
-	for(i = hash + 1; i < idx->nr && cnt < limit; i++) {
+	for(i = hash + 1; i < idx->nr && rstate->cnt < rstate->limit; i++) {
 		if(ds_skip_dst(idx->dlist[i].flags)
 				|| (ds_use_default != 0 && i == (idx->nr - 1))) {
 			continue;
 		}
 		/* max load exceeded per destination */
-		if(alg == DS_ALG_CALLLOAD
+		if(rstate->alg == DS_ALG_CALLLOAD
 				&& idx->dlist[i].dload >= idx->dlist[i].attrs.maxload) {
 			continue;
 		}
-		LM_DBG("using entry [%d/%d]\n", set, i);
-		if(ds_add_xavp_record(idx, i, set, alg, &pxavp)<0) {
-			LM_ERR("failed to add destination in the xavp (%d/%d)\n", i, set);
+		LM_DBG("using entry [%d/%d]\n", rstate->setid, i);
+		if(ds_add_xavp_record(idx, i, rstate->setid, rstate->alg,
+					&rstate->lxavp)<0) {
+			LM_ERR("failed to add destination in the xavp (%d/%d)\n",
+					i, rstate->setid);
 			return -1;
 		}
-		cnt++;
+		rstate->cnt++;
 	}
 
 	/* add to xavp the destinations before the selected one */
-	for(i = 0; i < hash && cnt < limit; i++) {
+	for(i = 0; i < hash && rstate->cnt < rstate->limit; i++) {
 		if(ds_skip_dst(idx->dlist[i].flags)
 				|| (ds_use_default != 0 && i == (idx->nr - 1))) {
 			continue;
 		}
 		/* max load exceeded per destination */
-		if(alg == DS_ALG_CALLLOAD
+		if(rstate->alg == DS_ALG_CALLLOAD
 				&& idx->dlist[i].dload >= idx->dlist[i].attrs.maxload) {
 			continue;
 		}
-		LM_DBG("using entry [%d/%d]\n", set, i);
-		if(ds_add_xavp_record(idx, i, set, alg, &pxavp)<0) {
-			LM_ERR("failed to add destination in the xavp (%d/%d)\n", i, set);
+		LM_DBG("using entry [%d/%d]\n", rstate->setid, i);
+		if(ds_add_xavp_record(idx, i, rstate->setid, rstate->alg,
+					&rstate->lxavp)<0) {
+			LM_ERR("failed to add destination in the xavp (%d/%d)\n",
+					i, rstate->setid);
 			return -1;
 		}
-		cnt++;
+		rstate->cnt++;
 	}
 
 	/* add default dst to last position in XAVP list */
-	if(ds_use_default != 0 && hash != idx->nr - 1 && cnt < limit) {
-		LM_DBG("using default entry [%d/%d]\n", set, idx->nr - 1);
-		if(ds_add_xavp_record(idx, idx->nr - 1, set, alg, &pxavp)<0) {
+	if(ds_use_default != 0 && hash != idx->nr - 1
+				&& rstate->cnt < rstate->limit) {
+		LM_DBG("using default entry [%d/%d]\n", rstate->setid, idx->nr - 1);
+		if(ds_add_xavp_record(idx, idx->nr - 1, rstate->setid, rstate->alg,
+					&rstate->lxavp)<0) {
 			LM_ERR("failed to add default destination in the xavp\n");
 			return -1;
 		}
-		cnt++;
-	}
-
-	if(((ds_xavp_ctx_mode & DS_XAVP_CTX_SKIP_CNT)==0)
-			&& (ds_xavp_ctx.len >= 0)) {
-		/* add to xavp the number of selected dst records */
-		memset(&nxval, 0, sizeof(sr_xval_t));
-		nxval.type = SR_XTYPE_INT;
-		nxval.v.i = cnt;
-		if(xavp_add_xavp_value(&ds_xavp_ctx, &ds_xavp_ctx_cnt, &nxval, NULL)==NULL) {
-			LM_ERR("failed to add cnt value to xavp\n");
-			return -1;
-		}
+		rstate->cnt++;
 	}
 
 	return 1;

+ 15 - 2
src/modules/dispatcher/dispatch.h

@@ -30,6 +30,7 @@
 
 #include <stdio.h>
 #include "../../core/pvar.h"
+#include "../../core/xavp.h"
 #include "../../core/parser/msg_parser.h"
 #include "../../core/rand/kam_rand.h"
 #include "../../modules/tm/tm_load.h"
@@ -121,8 +122,8 @@ void ds_disconnect_db(void);
 int ds_load_db(void);
 int ds_reload_db(void);
 int ds_destroy_list(void);
-int ds_select_dst_limit(
-		struct sip_msg *msg, int set, int alg, unsigned int limit, int mode);
+int ds_select_dst_limit(sip_msg_t *msg, int set, int alg, uint32_t limit,
+		int mode);
 int ds_select_dst(struct sip_msg *msg, int set, int alg, int mode);
 int ds_update_dst(struct sip_msg *msg, int upos, int mode);
 int ds_update_state(sip_msg_t *msg, int group, str *address, int state);
@@ -210,6 +211,16 @@ typedef struct _ds_set {
 	int longer;
 	gen_lock_t lock;
 } ds_set_t;
+
+typedef struct _ds_select_state {
+	int setid;
+	int alg;
+	int umode;
+	uint32_t limit;
+	int cnt;
+	sr_xavp_t *lxavp;
+} ds_select_state_t;
+
 /* clang-format on */
 
 #define AVL_LEFT 0
@@ -229,4 +240,6 @@ ds_set_t *ds_avl_insert(ds_set_t **root, int id, int *setn);
 ds_set_t *ds_avl_find(ds_set_t *node, int id);
 void ds_avl_destroy(ds_set_t **node);
 
+int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate);
+
 #endif

+ 177 - 0
src/modules/dispatcher/dispatcher.c

@@ -153,6 +153,8 @@ static int w_ds_select_dst(struct sip_msg*, char*, char*);
 static int w_ds_select_dst_limit(struct sip_msg*, char*, char*, char*);
 static int w_ds_select_domain(struct sip_msg*, char*, char*);
 static int w_ds_select_domain_limit(struct sip_msg*, char*, char*, char*);
+static int w_ds_select_routes(sip_msg_t*, char*, char*);
+static int w_ds_select_routes_limit(sip_msg_t*, char*, char*, char*);
 static int w_ds_next_dst(struct sip_msg*, char*, char*);
 static int w_ds_next_domain(struct sip_msg*, char*, char*);
 static int w_ds_set_dst(struct sip_msg*, char*, char*);
@@ -189,6 +191,10 @@ static cmd_export_t cmds[]={
 		fixup_igp_igp, 0, REQUEST_ROUTE|FAILURE_ROUTE},
 	{"ds_select_domain", (cmd_function)w_ds_select_domain_limit, 3,
 		fixup_igp_all, 0, REQUEST_ROUTE|FAILURE_ROUTE},
+	{"ds_select_routes", (cmd_function)w_ds_select_routes, 2,
+		fixup_spve_spve, 0, REQUEST_ROUTE|FAILURE_ROUTE},
+	{"ds_select_routes", (cmd_function)w_ds_select_routes_limit, 3,
+		fixup_spve_spve_igp, 0, REQUEST_ROUTE|FAILURE_ROUTE},
 	{"ds_next_dst",      (cmd_function)w_ds_next_dst,      0,
 		ds_warn_fixup, 0, REQUEST_ROUTE|FAILURE_ROUTE},
 	{"ds_next_domain",   (cmd_function)w_ds_next_domain,   0,
@@ -619,6 +625,167 @@ static int w_ds_select_domain_limit(
 			DS_SETOP_RURI /*set host port*/);
 }
 
+/**
+ *
+ */
+static int ki_ds_select_routes_limit(sip_msg_t *msg, str *srules, str *smode,
+		int rlimit)
+{
+	int i;
+	int vret;
+	int gret;
+	int vfirst;
+	sr_xval_t nxval;
+	ds_select_state_t vstate;
+
+	memset(&vstate, 0, sizeof(ds_select_state_t));
+	vstate.limit = (uint32_t)rlimit;
+	if(vstate.limit == 0) {
+		LM_DBG("Limit set to 0 - forcing to unlimited\n");
+		vstate.limit = 0xffffffff;
+	}
+	vret = -1;
+	gret = -1;
+	vfirst = 0;
+	i = 0;
+	while(i<srules->len) {
+		vstate.setid = 0;
+		for(; i<srules->len; i++) {
+			if(srules->s[i]<'0' || srules->s[i]>'9') {
+				if(srules->s[i]=='=') {
+					i++;
+					break;
+				} else {
+					LM_ERR("invalid character in [%.*s] at [%d]\n",
+							srules->len, srules->s, i);
+					return -1;
+				}
+			}
+			vstate.setid = (vstate.setid * 10) + (srules->s[i] - '0');
+		}
+		vstate.alg = 0;
+		for(; i<srules->len; i++) {
+			if(srules->s[i]<'0' || srules->s[i]>'9') {
+				if(srules->s[i]==';') {
+					i++;
+					break;
+				} else {
+					LM_ERR("invalid character in [%.*s] at [%d]\n",
+							srules->len, srules->s, i);
+					return -1;
+				}
+			}
+			vstate.alg = (vstate.alg * 10) + (srules->s[i] - '0');
+		}
+		LM_DBG("routing with setid=%d alg=%d cnt=%d limit=0x%x (%u)\n",
+			vstate.setid, vstate.alg, vstate.cnt, vstate.limit, vstate.limit);
+		
+		vstate.umode = DS_SETOP_XAVP;
+		if(vfirst==0) {
+			switch(smode->s[0]) {
+				case '0':
+				case 'd':
+				case 'D':
+					vstate.umode = DS_SETOP_DSTURI;
+				break;
+				case '1':
+				case 'r':
+				case 'R':
+					vstate.umode = DS_SETOP_RURI;
+				break;
+				case '2':
+				case 'x':
+				case 'X':
+				break;
+				default:
+					LM_ERR("invalid routing mode parameter: %.*s\n",
+							smode->len, smode->s);
+					return -1;
+			}
+			vfirst = 1;
+		}
+		vret = ds_manage_routes(msg, &vstate);
+		if(vret<0) {
+			LM_DBG("failed to select target destinations from %d=%d [%.*s]\n",
+					vstate.setid, vstate.alg, srules->len, srules->s);
+			/* continue to try other target groups */
+		} else {
+			if(vret>0) {
+				gret = vret;
+			}
+		}
+	}
+
+	if(gret<0) {
+		/* no selection of a target address */
+		LM_DBG("failed to select any target destinations from [%.*s]\n",
+					srules->len, srules->s);
+		/* return last failure code when trying to select target addresses */
+		return vret;
+	}
+
+	/* add cnt value to xavp */
+	if(((ds_xavp_ctx_mode & DS_XAVP_CTX_SKIP_CNT)==0)
+			&& (ds_xavp_ctx.len >= 0)) {
+		/* add to xavp the number of selected dst records */
+		memset(&nxval, 0, sizeof(sr_xval_t));
+		nxval.type = SR_XTYPE_INT;
+		nxval.v.i = vstate.cnt;
+		if(xavp_add_xavp_value(&ds_xavp_ctx, &ds_xavp_ctx_cnt, &nxval, NULL)==NULL) {
+			LM_ERR("failed to add cnt value to xavp\n");
+			return -1;
+		}
+	}
+
+	LM_DBG("selected target destinations: %d\n", vstate.cnt);
+	return gret;
+}
+
+/**
+ *
+ */
+static int ki_ds_select_routes(sip_msg_t *msg, str *srules, str *smode)
+{
+	return ki_ds_select_routes_limit(msg, srules, smode, 0);
+}
+
+/**
+ *
+ */
+static int w_ds_select_routes(sip_msg_t *msg, char *lrules, char *umode)
+{
+	return w_ds_select_routes_limit(msg, lrules, umode, 0);
+}
+
+/**
+ *
+ */
+static int w_ds_select_routes_limit(sip_msg_t *msg, char *lrules, char *umode,
+		char *rlimit)
+{
+	str vrules;
+	str vmode;
+	int vlimit;
+
+	if(fixup_get_svalue(msg, (gparam_t*)lrules, &vrules)<0) {
+		LM_ERR("failed to get routing rules parameter\n");
+		return -1;
+	}
+	if(fixup_get_svalue(msg, (gparam_t*)umode, &vmode)<0) {
+		LM_ERR("failed to get update mode parameter\n");
+		return -1;
+	}
+	if(rlimit!=NULL) {
+		if(fixup_get_ivalue(msg, (gparam_t*)rlimit, &vlimit)<0) {
+			LM_ERR("failed to get limit parameter\n");
+			return -1;
+		}
+	} else {
+		vlimit = 0;
+	}
+	return ki_ds_select_routes_limit(msg, &vrules, &vmode, vlimit);
+}
+
 /**
  *
  */
@@ -1112,6 +1279,16 @@ static sr_kemi_t sr_kemi_dispatcher_exports[] = {
 		{ SR_KEMIP_INT, SR_KEMIP_INT, SR_KEMIP_INT,
 			SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
 	},
+	{ str_init("dispatcher"), str_init("ds_select_routes"),
+		SR_KEMIP_INT, ki_ds_select_routes,
+		{ SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_NONE,
+			SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
+	},
+	{ str_init("dispatcher"), str_init("ds_select_routes_limit"),
+		SR_KEMIP_INT, ki_ds_select_routes_limit,
+		{ SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_INT,
+			SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
+	},
 	{ str_init("dispatcher"), str_init("ds_next_dst"),
 		SR_KEMIP_INT, ki_ds_next_dst,
 		{ SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE,