浏览代码

dispatcher: algorithm 13 (#2493)

- latency optimized round-robin with failover
- optionally congestion can be use instead of latency

- thanks to Salman Ali (asalman18) for the review
Julien Chavanton 4 年之前
父节点
当前提交
90e3033c51
共有 3 个文件被更改,包括 168 次插入5 次删除
  1. 129 4
      src/modules/dispatcher/dispatch.c
  2. 1 0
      src/modules/dispatcher/dispatch.h
  3. 38 1
      src/modules/dispatcher/doc/dispatcher_admin.xml

+ 129 - 4
src/modules/dispatcher/dispatch.c

@@ -78,6 +78,7 @@
 #define DS_ALG_CALLLOAD 10
 #define DS_ALG_RELWEIGHT 11
 #define DS_ALG_PARALLEL 12
+#define DS_ALG_LATENCY 13
 
 /* increment call load */
 #define DS_LOAD_INC(dgrp, didx) do { \
@@ -2066,6 +2067,34 @@ int ds_select_dst_limit(sip_msg_t *msg, int set, int alg, uint32_t limit,
 	return ret;
 }
 
+typedef struct sorted_ds {
+	int idx;
+	int priority;
+} sorted_ds_t;
+
+int ds_manage_routes_fill_reodered_xavp(sorted_ds_t *ds_sorted, ds_set_t *idx, ds_select_state_t *rstate)
+{
+	int i;
+	if(!(ds_flags & DS_FAILOVER_ON))
+		return 1;
+	for(i=0; i < idx->nr && rstate->cnt < rstate->limit; i++) {
+		if(ds_sorted[i].idx < 0 || ds_skip_dst(idx->dlist[i].flags)
+				|| (ds_use_default != 0 && ds_sorted[i].idx == (idx->nr - 1))) {
+			continue;
+		}
+		if(ds_add_xavp_record(idx, ds_sorted[i].idx, rstate->setid, rstate->alg,
+					&rstate->lxavp)<0) {
+			LM_ERR("failed to add destination in the xavp (%d/%d)\n",
+					ds_sorted[i].idx, rstate->setid);
+			return -1;
+		}
+		LM_DBG("destination added in the xavp (%d/%d)\n",
+					ds_sorted[i].idx, rstate->setid);
+		rstate->cnt++;
+	}
+	return 0;
+}
+
 int ds_manage_routes_fill_xavp(unsigned int hash, ds_set_t *idx, ds_select_state_t *rstate)
 {
 	int i;
@@ -2125,6 +2154,80 @@ int ds_manage_routes_fill_xavp(unsigned int hash, ds_set_t *idx, ds_select_state
 	return 0;
 }
 
+
+void ds_sorted_by_priority(sorted_ds_t * sorted_ds, int size) {
+	int i,ii;
+	for(i=0;i<size;++i) {
+		for(ii=1;ii<size;++ii) {
+			sorted_ds_t temp;
+			if(sorted_ds[ii-1].priority < sorted_ds[ii].priority) {
+				temp.idx = sorted_ds[ii].idx;
+				temp.priority = sorted_ds[ii].priority;
+				sorted_ds[ii].idx = sorted_ds[ii-1].idx;
+				sorted_ds[ii].priority = sorted_ds[ii-1].priority;
+				sorted_ds[ii-1].idx = temp.idx;
+				sorted_ds[ii-1].priority = temp.priority;
+			}
+		}
+	}
+}
+
+int ds_manage_route_algo13(ds_set_t *idx, ds_select_state_t *rstate) {
+	int hash = idx->last;
+	int y = 0;
+	int z = hash;
+	int active_priority = 0;
+	sorted_ds_t *ds_sorted = pkg_malloc(sizeof(sorted_ds_t) * idx->nr);
+	if(ds_sorted == NULL) {
+		LM_ERR("no more pkg\n");
+		return -1;
+	}
+
+	for(y=0; y<idx->nr ;y++) {
+		int latency_proirity_handicap = 0;
+		ds_dest_t * ds_dest = &idx->dlist[z];
+		int gw_priority = ds_dest->priority;
+		int gw_latency = ds_dest->latency_stats.estimate;
+		int gw_inactive = ds_skip_dst(ds_dest->flags);
+		// if cc is enabled, the latency is the congestion ms instead of the estimated latency.
+		if (ds_dest->attrs.congestion_control)
+			gw_latency = ds_dest->latency_stats.estimate - ds_dest->latency_stats.average;
+		if(!gw_inactive) {
+			if(gw_latency > gw_priority && gw_priority > 0)
+				latency_proirity_handicap = gw_latency / gw_priority;
+			ds_dest->attrs.rpriority = gw_priority - latency_proirity_handicap;
+			if(ds_dest->attrs.rpriority < 1 && gw_priority > 0)
+				ds_dest->attrs.rpriority = 1;
+			if(ds_dest->attrs.rpriority > active_priority) {
+				hash = z;
+				active_priority = ds_dest->attrs.rpriority;
+			}
+			ds_sorted[y].idx = z;
+			ds_sorted[y].priority = ds_dest->attrs.rpriority;
+			LM_DBG("[active]idx[%d]uri[%.*s]priority[%d-%d=%d]latency[%dms]flag[%d]\n",
+				z, ds_dest->uri.len, ds_dest->uri.s,
+				gw_priority, latency_proirity_handicap,
+				ds_dest->attrs.rpriority, gw_latency, ds_dest->flags);
+		} else {
+			ds_sorted[y].idx = -1;
+			ds_sorted[y].priority = -1;
+			LM_DBG("[inactive]idx[%d]uri[%.*s]priority[%d]latency[%dms]flag[%d]",
+				z, ds_dest->uri.len, ds_dest->uri.s,
+				gw_priority, gw_latency, ds_dest->flags);
+		}
+		if(ds_use_default != 0 && idx->nr != 1)
+			z = (z + 1) % (idx->nr - 1);
+		else
+			z = (z + 1) % idx->nr;
+	}
+	idx->last = (hash + 1) % idx->nr;
+	LM_DBG("priority[%d]gateway_selected[%d]next_index[%d]\n", active_priority, hash, idx->last);
+	ds_sorted_by_priority(ds_sorted, idx->nr);
+	ds_manage_routes_fill_reodered_xavp(ds_sorted, idx, rstate);
+	pkg_free(ds_sorted);
+	return hash;
+}
+
 /**
  *
  */
@@ -2135,6 +2238,7 @@ int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate)
 	ds_set_t *idx = NULL;
 	int ulast = 0;
 	int vlast = 0;
+	int xavp_filled = 0;
 
 	if(msg == NULL) {
 		LM_ERR("bad parameters\n");
@@ -2270,6 +2374,14 @@ int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate)
 		case DS_ALG_PARALLEL: /* 12 - parallel dispatching */
 			hash = 0;
 			break;
+		case DS_ALG_LATENCY: /* 13 - latency optimized round-robin with failover */
+			lock_get(&idx->lock);
+			hash = ds_manage_route_algo13(idx, rstate);
+			lock_release(&idx->lock);
+			if (hash == -1)
+				return -1;
+			xavp_filled = 1;
+			break;
 		default:
 			LM_WARN("algo %d not implemented - using first entry...\n",
 					rstate->alg);
@@ -2285,7 +2397,7 @@ int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate)
 	i = hash;
 
 	/* if selected address is inactive, find next active */
-	while(ds_skip_dst(idx->dlist[i].flags)) {
+	while(!xavp_filled && ds_skip_dst(idx->dlist[i].flags)) {
 		if(ds_use_default != 0 && idx->nr != 1)
 			i = (i + 1) % (idx->nr - 1);
 		else
@@ -2344,8 +2456,11 @@ int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate)
 		return 1;
 	}
 
-	if (ds_manage_routes_fill_xavp(hash, idx, rstate) == -1)
-		return -1;
+	if(!xavp_filled) {
+		if(ds_manage_routes_fill_xavp(hash, idx, rstate) == -1){
+			return -1;
+		}
+	}
 
 	/* add default dst to last position in XAVP list */
 	if(ds_use_default != 0 && hash != idx->nr - 1
@@ -2715,12 +2830,22 @@ int ds_update_latency(int group, str *address, int code)
 			int latency_ms;
 			/* Destination address found, this is the gateway that was pinged. */
 			state = ds_dest->flags;
+			if (!(state & DS_PROBING_DST)) {
+				i++;
+				continue;
+			}
 			if (code == 408 && latency_stats->timeout < UINT32_MAX)
 				latency_stats->timeout++;
 			gettimeofday(&now, NULL);
 			latency_ms = (now.tv_sec - latency_stats->start.tv_sec)*1000
 		            + (now.tv_usec - latency_stats->start.tv_usec)/1000;
-			latency_stats_update(latency_stats, latency_ms);
+			if (code != 408)
+				latency_stats_update(latency_stats, latency_ms);
+
+			LM_DBG("[%d]latency[%d]avg[%.2f][%.*s]code[%d]rweight[%d]\n",
+					latency_stats->count, latency_ms,
+					latency_stats->average, address->len, address->s,
+					code, ds_dest->attrs.rweight);
 
 			/* Adjusting weight using congestion detection based on latency estimator. */
 			if (ds_dest->attrs.congestion_control && ds_dest->attrs.weight > 0) {

+ 1 - 0
src/modules/dispatcher/dispatch.h

@@ -190,6 +190,7 @@ typedef struct _ds_attrs {
 	int congestion_control;
 	str ping_from;
 	str obproxy;
+	int rpriority;
 } ds_attrs_t;
 
 typedef struct _ds_latency_stats {

+ 38 - 1
src/modules/dispatcher/doc/dispatcher_admin.xml

@@ -1241,7 +1241,7 @@ modparam("dispatcher", "reload_delta", 1)
 				</para>
 				<para>
 				Using this algorithm, you can also enable congestion control by setting the
-				attibute 'cc=1', when 'cc' is enabled the 'weight' attribute will also be
+				attribute 'cc=1', when 'cc' is enabled the 'weight' attribute will also be
 				used to control congestion tolerance. When facing congestion the weight of
 				a gateway is lowered by 1 for every ms of estimated congestion, a 'rweight'
 				value of 50 is recommended. See the example "configuring load balancing with
@@ -1261,6 +1261,43 @@ modparam("dispatcher", "reload_delta", 1)
 				making sense in this case.
 				</para>
 			</listitem>
+			<listitem>
+				<para>
+				<quote>13</quote> - latency optimized dispatching
+				</para>
+				<para>
+				- The algorithm will load balance using round-robin prioritizing the gateways with the highest priority.
+				</para>
+				<para>
+				- If ds_ping_latency_stats is active the algorithm will adjust the priority of the gateway automatically,
+				the priority will be lowered by 1 point every time the latency ms is as high as the priority.
+				</para>
+				<para>
+				- If the attribute 'cc=1' is set, the latency used is congestion ms : estimate (current latency ms) - average (normal condition latency ms).
+				</para>
+				<example>
+				<title><function>latency_optimized_dispatching</function> usage</title>
+				<programlisting format="linespecific">
+Using this simple formula :
+    ADJUSTED_PRIORITY = PRIORITY - (ESTIMATED_LATENCY_MS/PRIORITY)
+
+GATEWAY | PRIORITY | ESTIMATED | ADJUSTED | LOAD
+   #    |          |  LATENCY  | PRIORITY | DISTRIBUTION
+   1    |    30    |    21     |    30    | 33%
+   2    |    30    |    91     |    27    | 0%
+   3    |    30    |    61     |    28    | 0%
+   4    |    30    |    19     |    30    | 33%
+   5    |    30    |    32     |    29    | 0%
+   6    |    30    |    0      |    30    | 33%
+   7    |    30    |    201    |    24    | 0%
+
+
+With congestion control the formula becomes :
+    CONGESTION_MS = CURRENT_LATENCY_MS - NORMAL_CONDITION_LATENCY_MS
+    ADJUSTED_PRIORITY = PRIORITY - (CONGESTION_MS/PRIORITY)
+				</programlisting>
+				</example>
+			</listitem>
 			<listitem>
 				<para>
 				<quote>X</quote> - if the algorithm is not implemented, the