瀏覽代碼

dispatcher: new algorithm - parallel dispatching (12)

- send the request to all destinations from the setid at once
- aka parallel forking to all address, so the AVPs are no longer
set, not being needed for a re-routing
Daniel-Constantin Mierla 8 年之前
父節點
當前提交
2ce57c44fe
共有 1 個文件被更改,包括 93 次插入0 次删除
  1. 93 0
      src/modules/dispatcher/dispatch.c

+ 93 - 0
src/modules/dispatcher/dispatch.c

@@ -75,6 +75,7 @@
 #define DS_ALG_WEIGHT 9
 #define DS_ALG_CALLLOAD 10
 #define DS_ALG_RELWEIGHT 11
+#define DS_ALG_PARALLEL 12
 
 static int _ds_table_version = DS_TABLE_VERSION;
 
@@ -1690,6 +1691,86 @@ static inline int ds_update_dst(
 	return 0;
 }
 
+/**
+ *
+ */
+int ds_add_branches(sip_msg_t *msg, ds_set_t *idx, unsigned int hash, int mode)
+{
+	unsigned int i = 0;
+	str ruri = STR_NULL;
+	sip_uri_t *puri = NULL;
+	char buri[MAX_URI_SIZE];
+
+	if(hash+1>=idx->nr) {
+		/* nothing to add */
+		return 0;
+	}
+
+	if(mode==1) {
+		/* ruri updates */
+		LM_DBG("adding branches with ruri\n");
+		if(parse_sip_msg_uri(msg)<0) {
+			LM_ERR("failed to parse sip msg uri\n");
+			return -1;
+		}
+		puri = &msg->parsed_uri;
+	} else {
+		/* duri updates */
+		LM_DBG("adding branches with duri\n");
+	}
+	for(i=hash+1; i<idx->nr; i++) {
+		if(mode==1) {
+			/* ruri updates */
+			if(puri->user.len<=0) {
+				/* no username to preserve */
+				if(append_branch(msg, &idx->dlist[i].uri, NULL, NULL,
+						Q_UNSPECIFIED, 0, idx->dlist[i].sock, NULL, 0,
+						NULL, NULL)<0) {
+					LM_ERR("failed to add branch with ruri\n");
+					return -1;
+				}
+			} else {
+				/* new uri from ruri username and dispatcher uri */
+				if(idx->dlist[i].uri.len<6) {
+					LM_WARN("invalid dispatcher uri - skipping (%u)\n", i);
+					continue;
+				}
+				if(strncmp(idx->dlist[i].uri.s, "sips:", 5)==0) {
+					ruri.len = snprintf(buri, MAX_URI_SIZE, "sips:%.*s@%.*s",
+							puri->user.len, puri->user.s,
+							idx->dlist[i].uri.len-5, idx->dlist[i].uri.s+5);
+				} else {
+					if(strncmp(idx->dlist[i].uri.s, "sip:", 4)==0) {
+						ruri.len = snprintf(buri, MAX_URI_SIZE, "sip:%.*s@%.*s",
+								puri->user.len, puri->user.s,
+								idx->dlist[i].uri.len-4, idx->dlist[i].uri.s+4);
+					} else {
+						LM_WARN("unsupported protocol schema - ignoring\n");
+						continue;
+					}
+				}
+				ruri.s = buri;
+				if(append_branch(msg, &ruri, NULL, NULL,
+						Q_UNSPECIFIED, 0, idx->dlist[i].sock, NULL, 0,
+						NULL, NULL)<0) {
+					LM_ERR("failed to add branch with user ruri\n");
+					return -1;
+				}
+			}
+		} else {
+			/* duri updates */
+			if(append_branch(msg, GET_RURI(msg), &idx->dlist[i].uri, NULL,
+					Q_UNSPECIFIED, 0, idx->dlist[i].sock, NULL, 0,
+					NULL, NULL)<0) {
+				LM_ERR("failed to add branch with duri\n");
+				return -1;
+			}
+		}
+
+	}
+	return 0;
+}
+
 /**
  *
  */
@@ -1839,6 +1920,9 @@ int ds_select_dst_limit(
 			hash = idx->rwlist[idx->rwlast];
 			idx->rwlast = (idx->rwlast + 1) % 100;
 			break;
+		case DS_ALG_PARALLEL: /* 12 - parallel dispatching */
+			hash = 0;
+			break;
 		default:
 			LM_WARN("algo %d not implemented - using first entry...\n", alg);
 			hash = 0;
@@ -1887,6 +1971,15 @@ int ds_select_dst_limit(
 	LM_DBG("selected [%d-%d/%d] <%.*s>\n", alg, set, hash,
 			idx->dlist[hash].uri.len, idx->dlist[hash].uri.s);
 
+	if(alg == DS_ALG_PARALLEL) {
+		if(ds_add_branches(msg, idx, hash, mode)<0) {
+			LM_ERR("failed to add additional branches\n");
+			/* one destination was already set - return success anyhow */
+			return 2;
+		}
+		return 1;
+	}
+
 	if(!(ds_flags & DS_FAILOVER_ON))
 		return 1;