Bladeren bron

rtpengine: fix queried_nodes_limit logic

Right now, even if the selected node returns error, the same node is still
selected and still queried for maximum of queried_nodes_limit times.

Don't retry to query the previous nodes, upon error returned (i.e.
Parallel session limit reached"). Instead, remember the queried nodes and try
to select between un-queried ones. Thus, rtpengine_offer() will select a proper,
available node which will be inserted in the hashtable and further used.
Stefan Mititelu 9 jaren geleden
bovenliggende
commit
131d8999c7
1 gewijzigde bestanden met toevoegingen van 47 en 14 verwijderingen
  1. 47 14
      modules/rtpengine/rtpengine.c

+ 47 - 14
modules/rtpengine/rtpengine.c

@@ -197,9 +197,10 @@ static int rtpengine_offer_answer(struct sip_msg *msg, const char *flags, int op
 static int fixup_set_id(void ** param, int param_no);
 static int set_rtpengine_set_f(struct sip_msg * msg, char * str1, char * str2);
 static struct rtpp_set * select_rtpp_set(int id_set);
-static struct rtpp_node *select_rtpp_node_new(str, str, int);
+static struct rtpp_node *select_rtpp_node_new(str, str, int, struct rtpp_node **, int);
 static struct rtpp_node *select_rtpp_node_old(str, str, int);
-static struct rtpp_node *select_rtpp_node(str, str, int);
+static struct rtpp_node *select_rtpp_node(str, str, int, struct rtpp_node **, int);
+static int is_queried_node(struct rtpp_node *, struct rtpp_node **, int);
 static int build_rtpp_socks(unsigned int current_rtpp_no);
 static char *send_rtpp_command(struct rtpp_node *, bencode_item_t *, int *);
 static int get_extra_id(struct sip_msg* msg, str *id_str);
@@ -234,6 +235,7 @@ static int rtpengine_disable_tout = 60;
 static int rtpengine_allow_op = 0;
 static int rtpengine_retr = 5;
 static int rtpengine_tout_ms = 1000;
+static struct rtpp_node **queried_nodes_ptr = NULL;
 static int queried_nodes_limit = MAX_RTPP_TRIED_NODES;
 static pid_t mypid;
 static unsigned int myseqn = 0;
@@ -387,6 +389,24 @@ struct module_exports exports = {
 	child_init
 };
 
+/* check if the node is already queried */
+static int is_queried_node(struct rtpp_node *node, struct rtpp_node **queried_nodes_ptr, int queried_nodes)
+{
+	int i;
+
+	if (!queried_nodes_ptr) {
+		return 0;
+	}
+
+	for (i = 0; i < queried_nodes; i++) {
+		if (node == queried_nodes_ptr[i]) {
+			return 1;
+		}
+	}
+
+	return 0;
+}
+
 /* hide the node from display and disable it permanent */
 int rtpengine_delete_node(struct rtpp_node *rtpp_node)
 {
@@ -1906,10 +1926,17 @@ child_init(int rank)
 
 	rtpp_socks = (int*)pkg_malloc(sizeof(int)*(rtpp_socks_size));
 	if (!rtpp_socks) {
-		LM_ERR("no more pkg memory for rtpp_socks\n");
 		return -1;
 	}
 
+	// vector of pointers to queried nodes
+	queried_nodes_ptr = (struct rtpp_node**)pkg_malloc(queried_nodes_limit * sizeof(struct rtpp_node*));
+	if (!queried_nodes_ptr) {
+		LM_ERR("no more pkg memory for queried_nodes_ptr\n");
+		return -1;
+	}
+	memset(queried_nodes_ptr, 0, queried_nodes_limit * sizeof(struct rtpp_node*));
+
 	/* Iterate known RTP proxies - create sockets */
 	if (rtpp_socks_size) {
 		build_rtpp_socks(rtpp_socks_size);
@@ -2208,7 +2235,7 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_
 	bencode_item_t *item, *resp;
 	str callid = STR_NULL, from_tag = STR_NULL, to_tag = STR_NULL, viabranch = STR_NULL;
 	str body = STR_NULL, error = STR_NULL;
-	int ret, queried_nodes;
+	int ret, queried_nodes = 0;
 	struct rtpp_node *node;
 	char *cp;
 	pv_value_t pv_val;
@@ -2333,14 +2360,14 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_
 	if(msg->id != current_msg_id)
 		active_rtpp_set = default_rtpp_set;
 
-	queried_nodes = 0;
 select_node:
 	do {
-		if (++queried_nodes > queried_nodes_limit) {
+		if (queried_nodes >= queried_nodes_limit) {
 			LM_ERR("queried nodes limit reached\n");
 			goto error;
 		}
-		node = select_rtpp_node(callid, viabranch, 1);
+
+		node = select_rtpp_node(callid, viabranch, 1, queried_nodes_ptr, queried_nodes);
 		if (!node) {
 			LM_ERR("no available proxies\n");
 			goto error;
@@ -2351,7 +2378,10 @@ select_node:
 			node->rn_disabled = 1;
 			node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout;
 		}
+
+		queried_nodes_ptr[queried_nodes++] = node;
 	} while (cp == NULL);
+
 	LM_DBG("proxy reply: %.*s\n", ret, cp);
 
 	set_rtp_inst_pvar(msg, &node->rn_url);
@@ -2366,8 +2396,7 @@ select_node:
 	if (!bencode_dictionary_get_strcmp(resp, "result", "error")) {
 		if (!bencode_dictionary_get_str(resp, "error-reason", &error)) {
 			LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp);
-		}
-		else {
+		} else {
 			if ((RTPENGINE_SESS_LIMIT_MSG_LEN == error.len) &&
 				(strncmp(error.s, RTPENGINE_SESS_LIMIT_MSG, RTPENGINE_SESS_LIMIT_MSG_LEN) == 0))
 			{
@@ -2679,7 +2708,7 @@ static struct rtpp_set * select_rtpp_set(int id_set ){
  * run the selection algorithm and return the new selected node
  */
 static struct rtpp_node *
-select_rtpp_node_new(str callid, str viabranch, int do_test)
+select_rtpp_node_new(str callid, str viabranch, int do_test, struct rtpp_node **queried_nodes_ptr, int queried_nodes)
 {
 	struct rtpp_node* node;
 	unsigned i, sum, sumcut, weight_sum;
@@ -2707,7 +2736,7 @@ retry:
 		}
 
 		/* Select only between enabled machines */
-		if (!node->rn_disabled) {
+		if (!node->rn_disabled && !is_queried_node(node, queried_nodes_ptr, queried_nodes)) {
 			weight_sum += node->rn_weight;
 		}
 	}
@@ -2752,7 +2781,11 @@ retry:
 		if (node->rn_disabled)
 			continue;
 
-		/* Found enabled machine */
+		/* Select only between not already queried machines */
+		if (is_queried_node(node, queried_nodes_ptr, queried_nodes))
+			continue;
+
+		/* Found machine */
 		if (sumcut < node->rn_weight) {
 			lock_release(active_rtpp_set->rset_lock);
 			goto found;
@@ -2808,7 +2841,7 @@ select_rtpp_node_old(str callid, str viabranch, int do_test)
  * the call if some proxies were disabled or enabled (e.g. kamctl command)
  */
 static struct rtpp_node *
-select_rtpp_node(str callid, str viabranch, int do_test)
+select_rtpp_node(str callid, str viabranch, int do_test, struct rtpp_node **queried_nodes_ptr, int queried_nodes)
 {
 	struct rtpp_node *node = NULL;
 	unsigned int current_rtpp_no;
@@ -2837,7 +2870,7 @@ select_rtpp_node(str callid, str viabranch, int do_test)
 	// check node
 	if (!node) {
 		// run the selection algorithm
-		node = select_rtpp_node_new(callid, viabranch, do_test);
+		node = select_rtpp_node_new(callid, viabranch, do_test, queried_nodes_ptr, queried_nodes);
 
 		// check node
 		if (!node) {