2
0
Эх сурвалжийг харах

Merge pull request #219 from snen/dispatcher_weight_relative

dispatcher: relative weight distribution
Daniel-Constantin Mierla 10 жил өмнө
parent
commit
37e205c512

+ 125 - 10
modules/dispatcher/dispatch.c

@@ -88,6 +88,8 @@ int *next_idx   = NULL;
 static void ds_run_route(struct sip_msg *msg, str *uri, char *route);
 static void ds_run_route(struct sip_msg *msg, str *uri, char *route);
 
 
 void destroy_list(int);
 void destroy_list(int);
+void shuffle_uint100array(unsigned int* arr);
+int ds_reinit_rweight_on_state_change(int old_state, int new_state, ds_set_t *dset);
 
 
 /**
 /**
  *
  *
@@ -131,12 +133,13 @@ int ds_print_sets(void)
 	{
 	{
 		for(i=0; i<si->nr; i++)
 		for(i=0; i<si->nr; i++)
 		{
 		{
-			LM_DBG("dst>> %d %.*s %d %d (%.*s,%d,%d)\n", si->id,
+			LM_DBG("dst>> %d %.*s %d %d (%.*s,%d,%d,%d)\n", si->id,
 					si->dlist[i].uri.len, si->dlist[i].uri.s,
 					si->dlist[i].uri.len, si->dlist[i].uri.s,
 					si->dlist[i].flags, si->dlist[i].priority,
 					si->dlist[i].flags, si->dlist[i].priority,
 					si->dlist[i].attrs.duid.len, si->dlist[i].attrs.duid.s,
 					si->dlist[i].attrs.duid.len, si->dlist[i].attrs.duid.s,
 					si->dlist[i].attrs.maxload,
 					si->dlist[i].attrs.maxload,
-					si->dlist[i].attrs.weight);
+					si->dlist[i].attrs.weight,
+					si->dlist[i].attrs.rweight);
 		}
 		}
 		si = si->next;
 		si = si->next;
 	}
 	}
@@ -217,7 +220,18 @@ int ds_set_attrs(ds_dest_t *dest, str *attrs)
 		} else if(pit->name.len==6
 		} else if(pit->name.len==6
 				&& strncasecmp(pit->name.s, "socket", 6)==0) {
 				&& strncasecmp(pit->name.s, "socket", 6)==0) {
 			dest->attrs.socket = pit->body;
 			dest->attrs.socket = pit->body;
+		}else if(pit->name.len==7
+				&& strncasecmp(pit->name.s, "rweight", 7)==0) {
+			int tmp_rweight;
+			str2sint(&pit->body, &tmp_rweight);
+			if ( tmp_rweight>=1 && tmp_rweight<=100 ) {
+				dest->attrs.rweight = tmp_rweight;
+			}
+			else{
+				LM_ERR("rweight %d not in 1-100 range; skipped", tmp_rweight);
+			}
 		}
 		}
+
 	}
 	}
 	return 0;
 	return 0;
 }
 }
@@ -397,6 +411,82 @@ err:
 	return -1;
 	return -1;
 }
 }
 
 
+
+/* for internal usage; arr must be arr[100] */
+void shuffle_uint100array(unsigned int* arr){
+	if (arr == NULL)
+		return;
+	int k;
+	int j;
+	unsigned int t;
+	srand(time(0));
+	for (j=0; j<100; j++)
+	{
+		k = j + (rand() % (100-j));
+		t = arr[j];
+		arr[j] = arr[k];
+		arr[k] = t;
+	}
+}
+
+
+/**
+ * Initialize the relative weight distribution for a destination set
+ * - fill the array of 0..99 elements where to keep the index of the
+ *   destination address to be used. The Nth call will use
+ *   the address with the index at possition N%100
+ */
+int dp_init_relative_weights(ds_set_t *dset)
+{
+	int j;
+	int k;
+	int t;
+
+	if(dset==NULL || dset->dlist==NULL)
+		return -1;
+	
+	int rw_sum = 0;
+	/* find the sum of relative weights*/
+	for(j=0; j<dset->nr; j++){
+		if( ds_skip_dst(dset->dlist[j].flags ) )
+			continue;
+		rw_sum += dset->dlist[j].attrs.rweight;
+	}
+
+	if (rw_sum == 0){
+		return 0;
+	}
+
+	/* fill the array based on the relative weight of each destination */
+	t = 0;
+	for(j=0; j<dset->nr; j++)
+	{
+		if( ds_skip_dst(dset->dlist[j].flags ) )
+			continue;	
+
+		int current_slice = dset->dlist[j].attrs.rweight*100/rw_sum;  //truncate here; 
+		for(k=0; k<current_slice; k++)
+		{
+			dset->rwlist[t] = (unsigned int)j;
+			t++;
+		}
+	}
+	/* if the array was not completely filled (i.e., the sum of rweights is
+	 * less than 100 due to truncated), then use last address to fill the rest */
+	unsigned int last_insert = t>0? dset->rwlist[t-1] : (unsigned int)(dset->nr-1); 
+	for(j=t; j<100; j++)
+		dset->rwlist[j] = last_insert;
+
+	/* shuffle the content of the array in order to mix the selection
+	 * of the addresses (e.g., if first address has weight=20, avoid
+	 * sending first 20 calls to it, but ensure that within a 100 calls,
+	 * 20 go to first address */
+	shuffle_uint100array(dset->rwlist);
+
+	return 0;
+}
+
+
 /**
 /**
  * Initialize the weight distribution for a destination set
  * Initialize the weight distribution for a destination set
  * - fill the array of 0..99 elements where to keep the index of the
  * - fill the array of 0..99 elements where to keep the index of the
@@ -441,14 +531,7 @@ randomize:
 	 * of the addresses (e.g., if first address has weight=20, avoid
 	 * of the addresses (e.g., if first address has weight=20, avoid
 	 * sending first 20 calls to it, but ensure that within a 100 calls,
 	 * sending first 20 calls to it, but ensure that within a 100 calls,
 	 * 20 go to first address */
 	 * 20 go to first address */
-	srand(time(0));
-	for (j=0; j<100; j++)
-	{
-		k = j + (rand() % (100-j));
-		t = (int)dset->wlist[j];
-		dset->wlist[j] = dset->wlist[k];
-		dset->wlist[k] = (unsigned int)t;
-	}
+	shuffle_uint100array(dset->wlist);
 
 
 	return 0;
 	return 0;
 }
 }
@@ -488,6 +571,7 @@ int reindex_dests(int list_idx, int setn)
 		}
 		}
 		sp->dlist = dp0;
 		sp->dlist = dp0;
 		dp_init_weights(sp);
 		dp_init_weights(sp);
+		dp_init_relative_weights(sp);
 	}
 	}
 
 
 	LM_DBG("found [%d] dest sets\n", setn);
 	LM_DBG("found [%d] dest sets\n", setn);
@@ -1799,6 +1883,10 @@ int ds_select_dst_limit(sip_msg_t *msg, int set, int alg, unsigned int limit, in
 				}
 				}
 			}
 			}
 			break;
 			break;
+		case 11: /* relative weight based distribution */
+			hash = idx->rwlist[idx->rwlast];
+			idx->rwlast = (idx->rwlast+1) % 100;
+			break;
 		default:
 		default:
 			LM_WARN("algo %d not implemented - using first entry...\n", alg);
 			LM_WARN("algo %d not implemented - using first entry...\n", alg);
 			hash = 0;
 			hash = 0;
@@ -2292,6 +2380,8 @@ int ds_update_state(sip_msg_t *msg, int group, str *address, int state)
 				if(ds_skip_dst(old_state) && !ds_skip_dst(idx->dlist[i].flags))
 				if(ds_skip_dst(old_state) && !ds_skip_dst(idx->dlist[i].flags))
 					ds_run_route(msg, address, "dispatcher:dst-up");
 					ds_run_route(msg, address, "dispatcher:dst-up");
 			}
 			}
+			if (idx->dlist[i].attrs.rweight > 0)
+				ds_reinit_rweight_on_state_change(old_state, idx->dlist[i].flags, idx);
 
 
 			return 0;
 			return 0;
 		}
 		}
@@ -2343,6 +2433,26 @@ static void ds_run_route(sip_msg_t *msg, str *uri, char *route)
 	set_route_type(backup_rt);
 	set_route_type(backup_rt);
 }
 }
 
 
+
+/**
+ recalculate relative states if some destination state was changed
+ */
+int ds_reinit_rweight_on_state_change(int old_state, int new_state, ds_set_t *dset)
+{
+	if (dset == NULL){
+		LM_ERR("destination set is null\n");
+		return -1;
+	}
+	if (	(!ds_skip_dst(old_state) && ds_skip_dst(new_state)) ||
+		(ds_skip_dst(old_state) && !ds_skip_dst(new_state)) )
+	{
+		dp_init_relative_weights(dset);
+	}
+
+	return 0;
+}
+
+
 /**
 /**
  *
  *
  */
  */
@@ -2370,10 +2480,15 @@ int ds_reinit_state(int group, str *address, int state)
 				&& strncasecmp(idx->dlist[i].uri.s, address->s,
 				&& strncasecmp(idx->dlist[i].uri.s, address->s,
 					address->len)==0)
 					address->len)==0)
 		{
 		{
+			int old_state = idx->dlist[i].flags;
 			/* reset the bits used for states */
 			/* reset the bits used for states */
 			idx->dlist[i].flags &= ~(DS_STATES_ALL);
 			idx->dlist[i].flags &= ~(DS_STATES_ALL);
 			/* set the new states */
 			/* set the new states */
 			idx->dlist[i].flags |= state;
 			idx->dlist[i].flags |= state;
+			if (idx->dlist[i].attrs.rweight > 0){
+				ds_reinit_rweight_on_state_change(old_state, idx->dlist[i].flags, idx);
+			}
+
 			return 0;
 			return 0;
 		}
 		}
 	}
 	}

+ 3 - 0
modules/dispatcher/dispatch.h

@@ -149,6 +149,7 @@ typedef struct _ds_attrs
 	str socket;
 	str socket;
 	int maxload;
 	int maxload;
 	int weight;
 	int weight;
+	int rweight;
 } ds_attrs_t;
 } ds_attrs_t;
 
 
 typedef struct _ds_dest
 typedef struct _ds_dest
@@ -172,8 +173,10 @@ typedef struct _ds_set
 	int nr;				/*!< number of items in dst set */
 	int nr;				/*!< number of items in dst set */
 	int last;			/*!< last used item in dst set (round robin) */
 	int last;			/*!< last used item in dst set (round robin) */
 	int wlast;			/*!< last used item in dst set (by weight) */
 	int wlast;			/*!< last used item in dst set (by weight) */
+	int rwlast;			/*!< last used item in dst set (by relaitive weight) */
 	ds_dest_t *dlist;
 	ds_dest_t *dlist;
 	unsigned int wlist[100];
 	unsigned int wlist[100];
+	unsigned int rwlist[100];
 	struct _ds_set *next;
 	struct _ds_set *next;
 } ds_set_t;
 } ds_set_t;
 
 

+ 2 - 1
modules/dispatcher/dispatcher.c

@@ -1211,12 +1211,13 @@ static void dispatcher_rpc_list(rpc_t* rpc, void* ctx)
 					rpc->fault(ctx, 500, "Internal error creating dest struct");
 					rpc->fault(ctx, 500, "Internal error creating dest struct");
 					return;
 					return;
 				}
 				}
-				if(rpc->struct_add(wh, "SSddS",
+				if(rpc->struct_add(wh, "SSdddS",
 							"BODY", &(list->dlist[j].attrs.body),
 							"BODY", &(list->dlist[j].attrs.body),
 							"DUID", (list->dlist[j].attrs.duid.s)?
 							"DUID", (list->dlist[j].attrs.duid.s)?
 							&(list->dlist[j].attrs.duid):&data,
 							&(list->dlist[j].attrs.duid):&data,
 							"MAXLOAD", list->dlist[j].attrs.maxload,
 							"MAXLOAD", list->dlist[j].attrs.maxload,
 							"WEIGHT", list->dlist[j].attrs.weight,
 							"WEIGHT", list->dlist[j].attrs.weight,
+							"RWEIGHT", list->dlist[j].attrs.rweight,
 							"SOCKET", (list->dlist[j].attrs.socket.s)?
 							"SOCKET", (list->dlist[j].attrs.socket.s)?
 							&(list->dlist[j].attrs.socket):&data)<0)
 							&(list->dlist[j].attrs.socket):&data)<0)
 				{
 				{

+ 24 - 0
modules/dispatcher/doc/dispatcher_admin.xml

@@ -915,6 +915,24 @@ modparam("dispatcher", "force_dst", 1)
 				requests as it is the only SIP method creating a SIP call.
 				requests as it is the only SIP method creating a SIP call.
 				</para>
 				</para>
 			</listitem>
 			</listitem>
+			<listitem>
+				<para>
+				<quote>11</quote> - use relative weight based load distribution.
+				You have to set the attribute 'rweight' per each address in
+				destination set. Active host usage probability is 
+				rweight/(SUM of all active host rweights in destination group). 
+				</para>
+				<para>
+				The major difference from the weight distribution is the
+				probability recalculation according to rweight value in case of 
+				host enabling/disabling
+				</para>
+				<para>
+				For example, 100 calls in 3-hosts group with rweight params 1/2/1  
+				will be distributed as 25/50/25. After third host failing 
+				distribution will be changed to 33/67/0.
+				</para>
+			</listitem>
 			<listitem>
 			<listitem>
 				<para>
 				<para>
 				<quote>X</quote> - if the algorithm is not implemented, the
 				<quote>X</quote> - if the algorithm is not implemented, the
@@ -1449,6 +1467,12 @@ onreply_route {
 					100. The value represents the percent of calls to be
 					100. The value represents the percent of calls to be
 					sent to that gateways.
 					sent to that gateways.
 				</listitem>
 				</listitem>
+				<listitem>
+					'rweight' - used for relative weight based load 
+					distribution. It must be set to a positive integer value 
+					between 1 and 100 (otherwise host will be excluded from 
+					relative weight distribution type). 
+				</listitem>
 				<listitem>
 				<listitem>
 					'socket' - used to set the sending socket for the gateway.
 					'socket' - used to set the sending socket for the gateway.
 					It is used for sending the SIP traffic as well as
 					It is used for sending the SIP traffic as well as