Parcourir la source

rtpengine: rebuild rtpp_socks before send command

- update node selection only from displayed nodes
- update rtpp_set_list alocation from get_rtpp_set() to mod_init();
thus all procs will have reference to the list of sets.
- add locks for rtpp_set_head and rtpp_set

- make rtpp_no shm variable guarded by rtpp_no_lock
- add rtpp_socks_size pkg variable
- compare rtpp_socks_size with rtpp_no and rebuild rtpp_socks if they differ
Stefan Mititelu il y a 9 ans
Parent
commit
789dec73bd
3 fichiers modifiés avec 274 ajouts et 49 suppressions
  1. 270 47
      modules/rtpengine/rtpengine.c
  2. 2 0
      modules/rtpengine/rtpengine.h
  3. 2 2
      modules/rtpengine/rtpengine_db.c

+ 270 - 47
modules/rtpengine/rtpengine.c

@@ -200,6 +200,7 @@ static struct rtpp_set * select_rtpp_set(int id_set);
 static struct rtpp_node *select_rtpp_node_new(str, str, int);
 static struct rtpp_node *select_rtpp_node_old(str, str, int);
 static struct rtpp_node *select_rtpp_node(str, str, int);
+static int build_rtpp_socks(unsigned int current_rtpp_no);
 static char *send_rtpp_command(struct rtpp_node *, bencode_item_t *, int *);
 static int get_extra_id(struct sip_msg* msg, str *id_str);
 
@@ -259,8 +260,10 @@ static str rtp_inst_pv_param = {NULL, 0};
 static pv_spec_t *rtp_inst_pvar = NULL;
 
 /* array with the sockets used by rtpporxy (per process)*/
-static unsigned int rtpp_no = 0;
+static unsigned int *rtpp_no = 0;
+static gen_lock_t *rtpp_no_lock = 0;
 static int *rtpp_socks = 0;
+static unsigned int rtpp_socks_size = 0;
 
 static int setid_avp_type;
 static int_str setid_avp;
@@ -398,10 +401,12 @@ int rtpengine_delete_node_set(struct rtpp_set *rtpp_list)
 {
 	struct rtpp_node *rtpp_node;
 
+	lock_get(rtpp_list->rset_lock);
 	for(rtpp_node = rtpp_list->rn_first; rtpp_node != NULL;
 			rtpp_node = rtpp_node->rn_next) {
 		rtpengine_delete_node(rtpp_node);
 	}
+	lock_release(rtpp_list->rset_lock);
 
 	return 1;
 }
@@ -415,10 +420,12 @@ int rtpengine_delete_node_all()
 		return 1;
 	}
 
+	lock_get(rtpp_set_list->rset_head_lock);
 	for(rtpp_list = rtpp_set_list->rset_first; rtpp_list != NULL;
 			rtpp_list = rtpp_list->rset_next) {
 		rtpengine_delete_node_set(rtpp_list);
 	}
+	lock_release(rtpp_set_list->rset_head_lock);
 
 	return 1;
 }
@@ -635,13 +642,16 @@ struct rtpp_node *get_rtpp_node(struct rtpp_set *rtpp_list, str *url)
 		return NULL;
 	}
 
+	lock_get(rtpp_list->rset_lock);
 	rtpp_node = rtpp_list->rn_first;
 	while (rtpp_node) {
 		if (str_cmp(&rtpp_node->rn_url, url) == 0) {
+			lock_release(rtpp_list->rset_lock);
 			return rtpp_node;
 		}
 		rtpp_node = rtpp_node->rn_next;
 	}
+	lock_release(rtpp_list->rset_lock);
 
 	return NULL;
 }
@@ -660,8 +670,9 @@ struct rtpp_set *get_rtpp_set(int set_id)
 
 	my_current_id = set_id;
 	/*search for the current_id*/
+	lock_get(rtpp_set_list->rset_head_lock);
 	rtpp_list = rtpp_set_list ? rtpp_set_list->rset_first : 0;
-	while( rtpp_list != 0 && rtpp_list->id_set!=my_current_id)
+	while (rtpp_list != 0 && rtpp_list->id_set!=my_current_id)
 		rtpp_list = rtpp_list->rset_next;
 
 	if (rtpp_list==NULL)
@@ -669,11 +680,29 @@ struct rtpp_set *get_rtpp_set(int set_id)
 		rtpp_list = shm_malloc(sizeof(struct rtpp_set));
 		if(!rtpp_list)
 		{
+			lock_release(rtpp_set_list->rset_head_lock);
 			LM_ERR("no shm memory left to create new rtpproxy set %d\n", my_current_id);
 			return NULL;
 		}
 		memset(rtpp_list, 0, sizeof(struct rtpp_set));
 		rtpp_list->id_set = my_current_id;
+		rtpp_list->rset_lock = lock_alloc();
+		if (!rtpp_list->rset_lock) {
+			lock_release(rtpp_set_list->rset_head_lock);
+			LM_ERR("no shm memory left to create rtpproxy set lock\n");
+			shm_free(rtpp_list);
+			rtpp_list = NULL;
+			return NULL;
+		}
+		if (lock_init(rtpp_list->rset_lock) == 0) {
+			lock_release(rtpp_set_list->rset_head_lock);
+			LM_ERR("could not init rtpproxy set lock\n");
+			lock_dealloc((void*)rtpp_list->rset_lock);
+			rtpp_list->rset_lock = NULL;
+			shm_free(rtpp_list);
+			rtpp_list = NULL;
+			return NULL;
+		}
 		new_list = 1;
 	}
 	else {
@@ -682,15 +711,6 @@ struct rtpp_set *get_rtpp_set(int set_id)
 
 	if (new_list)
 	{
-		if(!rtpp_set_list){/*initialize the list of set*/
-			rtpp_set_list = shm_malloc(sizeof(struct rtpp_set_head));
-			if(!rtpp_set_list){
-				LM_ERR("no shm memory left to create list of proxysets\n");
-				return NULL;
-			}
-			memset(rtpp_set_list, 0, sizeof(struct rtpp_set_head));
-		}
-
 		/*update the list of set info*/
 		if (!rtpp_set_list->rset_first)
 		{
@@ -708,6 +728,8 @@ struct rtpp_set *get_rtpp_set(int set_id)
 			default_rtpp_set = rtpp_list;
 		}
 	}
+	lock_release(rtpp_set_list->rset_head_lock);
+
 	return rtpp_list;
 }
 
@@ -757,7 +779,9 @@ int add_rtpengine_socks(struct rtpp_set * rtpp_list, char * rtpproxy,
 		}
 		memset(pnode, 0, sizeof(*pnode));
 
-		pnode->idx = rtpp_no++;
+		lock_get(rtpp_no_lock);
+		pnode->idx = *rtpp_no;
+
 		if (ticks == MI_MAX_RECHECK_TICKS) {
 			pnode->rn_recheck_ticks = ticks;
 		} else {
@@ -769,7 +793,7 @@ int add_rtpengine_socks(struct rtpp_set * rtpp_list, char * rtpproxy,
 		pnode->rn_displayed = 1;
 		pnode->rn_url.s = shm_malloc(p2 - p1 + 1);
 		if (pnode->rn_url.s == NULL) {
-			rtpp_no--;
+			lock_release(rtpp_no_lock);
 			shm_free(pnode);
 			LM_ERR("no shm memory left\n");
 			return -1;
@@ -790,8 +814,8 @@ int add_rtpengine_socks(struct rtpp_set * rtpp_list, char * rtpproxy,
 			pnode->rn_umode = 0;
 			pnode->rn_address += 5;
 		} else {
+			lock_release(rtpp_no_lock);
 			LM_WARN("Node address must start with 'udp:' or 'udp6:' or 'unix:'. Ignore '%s'.\n", pnode->rn_address);
-			rtpp_no--;
 			shm_free(pnode->rn_url.s);
 			shm_free(pnode);
 
@@ -813,8 +837,8 @@ int add_rtpengine_socks(struct rtpp_set * rtpp_list, char * rtpproxy,
 			s1.s = p1;
 			s1.len = strlen(p1);
 			if (str2int(&s1, &port) < 0 || port > 0xFFFF) {
+				lock_release(rtpp_no_lock);
 				LM_WARN("Node address must end with a valid port number. Ignore '%s'.\n", pnode->rn_address);
-				rtpp_no--;
 				shm_free(pnode->rn_url.s);
 				shm_free(pnode);
 
@@ -828,13 +852,16 @@ int add_rtpengine_socks(struct rtpp_set * rtpp_list, char * rtpproxy,
 
 		/* If node found in set, update it */
 		rtpp_node = get_rtpp_node(rtpp_list, &pnode->rn_url);
+
+		lock_get(rtpp_list->rset_lock);
 		if (rtpp_node) {
 			rtpp_node->rn_disabled = pnode->rn_disabled;
 			rtpp_node->rn_displayed = pnode->rn_displayed;
 			rtpp_node->rn_recheck_ticks = pnode->rn_recheck_ticks;
 			rtpp_node->rn_weight = pnode->rn_weight;
+			lock_release(rtpp_list->rset_lock);
+			lock_release(rtpp_no_lock);
 
-			rtpp_no--;
 			shm_free(pnode->rn_url.s);
 			shm_free(pnode);
 
@@ -853,6 +880,10 @@ int add_rtpengine_socks(struct rtpp_set * rtpp_list, char * rtpproxy,
 
 		rtpp_list->rn_last = pnode;
 		rtpp_list->rtpp_node_count++;
+		lock_release(rtpp_list->rset_lock);
+
+		*rtpp_no = *rtpp_no + 1;
+		lock_release(rtpp_no_lock);
 
 		if (!isDB) {
 			continue;
@@ -1069,12 +1100,18 @@ static struct mi_root* mi_enable_rtp_proxy(struct mi_root *cmd_tree, void *param
 		found = MI_FOUND_ALL;
 	}
 
+	lock_get(rtpp_set_list->rset_head_lock);
 	for(rtpp_list = rtpp_set_list->rset_first; rtpp_list != NULL;
 			rtpp_list = rtpp_list->rset_next) {
 
+		lock_get(rtpp_list->rset_lock);
 		for(crt_rtpp = rtpp_list->rn_first; crt_rtpp != NULL;
 				crt_rtpp = crt_rtpp->rn_next) {
 
+			if (!crt_rtpp->rn_displayed) {
+				continue;
+			}
+
 			/* found a matching rtpp - show it */
 			if (found == MI_FOUND_ALL ||
 			   (crt_rtpp->rn_url.len == rtpp_url.len &&
@@ -1107,7 +1144,9 @@ static struct mi_root* mi_enable_rtp_proxy(struct mi_root *cmd_tree, void *param
 				}
 			}
 		}
+		lock_release(rtpp_list->rset_lock);
 	}
+	lock_release(rtpp_set_list->rset_head_lock);
 
 	root = init_mi_tree(200, MI_OK_S, MI_OK_LEN);
 	if (!root) {
@@ -1297,9 +1336,11 @@ static struct mi_root* mi_show_rtp_proxy(struct mi_root* cmd_tree, void* param)
 		found = MI_FOUND_ALL;
 	}
 
+	lock_get(rtpp_set_list->rset_head_lock);
 	for(rtpp_list = rtpp_set_list->rset_first; rtpp_list != NULL;
 			rtpp_list = rtpp_list->rset_next) {
 
+		lock_get(rtpp_list->rset_lock);
 		for(crt_rtpp = rtpp_list->rn_first; crt_rtpp != NULL;
 				crt_rtpp = crt_rtpp->rn_next) {
 
@@ -1313,6 +1354,8 @@ static struct mi_root* mi_show_rtp_proxy(struct mi_root* cmd_tree, void* param)
 			   strncmp(crt_rtpp->rn_url.s, rtpp_url.s, rtpp_url.len) == 0)) {
 
 				if (add_rtpp_node_info(node, crt_rtpp, rtpp_list) < 0) {
+					lock_release(rtpp_list->rset_lock);
+					lock_release(rtpp_set_list->rset_head_lock);
 					goto error;
 				}
 
@@ -1321,7 +1364,9 @@ static struct mi_root* mi_show_rtp_proxy(struct mi_root* cmd_tree, void* param)
 				}
 			}
 		}
+		lock_release(rtpp_list->rset_lock);
 	}
+	lock_release(rtpp_set_list->rset_head_lock);
 
 	switch (found) {
 		case MI_FOUND_ALL:
@@ -1383,12 +1428,18 @@ static struct mi_root* mi_ping_rtp_proxy(struct mi_root* cmd_tree, void* param)
 		found = MI_FOUND_ALL;
 	}
 
+	lock_get(rtpp_set_list->rset_head_lock);
 	for (rtpp_list = rtpp_set_list->rset_first; rtpp_list != NULL;
 			rtpp_list = rtpp_list->rset_next) {
 
+		lock_get(rtpp_list->rset_lock);
 		for (crt_rtpp = rtpp_list->rn_first; crt_rtpp != NULL;
 				crt_rtpp = crt_rtpp->rn_next) {
 
+			if (!crt_rtpp->rn_displayed) {
+				continue;
+			}
+
 			/* found a matching rtpp - ping it */
 			if (found == MI_FOUND_ALL ||
 			   (crt_rtpp->rn_url.len == rtpp_url.len &&
@@ -1407,7 +1458,9 @@ static struct mi_root* mi_ping_rtp_proxy(struct mi_root* cmd_tree, void* param)
 				}
 			}
 		}
+		lock_release(rtpp_list->rset_lock);
 	}
+	lock_release(rtpp_set_list->rset_head_lock);
 
 	root = init_mi_tree(200, MI_OK_S, MI_OK_LEN);
 	if (!root) {
@@ -1510,6 +1563,7 @@ static struct mi_root*
 mi_reload_rtp_proxy(struct mi_root* cmd_tree, void* param)
 {
 	struct mi_root *root = NULL;
+	unsigned int current_rtpp_no;
 
 	if (rtpp_db_url.s == NULL) {
 		// no database
@@ -1527,6 +1581,14 @@ mi_reload_rtp_proxy(struct mi_root* cmd_tree, void* param)
 				return 0;
 			}
 		} else {
+			lock_get(rtpp_no_lock);
+			current_rtpp_no = *rtpp_no;
+			lock_release(rtpp_no_lock);
+
+			if (rtpp_socks_size != current_rtpp_no) {
+				build_rtpp_socks(current_rtpp_no);
+			}
+
 			// success reloading from database
 			root = init_mi_tree(200, MI_DB_OK, MI_DB_OK_LEN);
 			if (!root) {
@@ -1554,9 +1616,44 @@ mod_init(void)
 		return -1;
 	}
 
-	/* any rtpproxy configured? */
-	if(rtpp_set_list)
-		default_rtpp_set = select_rtpp_set(DEFAULT_RTPP_SET_ID);
+	rtpp_no = (unsigned int*)shm_malloc(sizeof(unsigned int));
+	if (!rtpp_no) {
+		LM_ERR("no more shm memory for rtpp_no\n");
+		return -1;
+	}
+	*rtpp_no = 0;
+
+	rtpp_no_lock = lock_alloc();
+	if (!rtpp_no_lock) {
+		LM_ERR("no more shm memory for rtpp_no_lock\n");
+		return -1;
+	}
+
+	if (lock_init(rtpp_no_lock) == 0) {
+		LM_ERR("could not init rtpp_no_lock\n");
+		return -1;
+	}
+
+	/* initialize the list of set; mod_destroy does shm_free() if fail */
+	if (!rtpp_set_list) {
+		rtpp_set_list = shm_malloc(sizeof(struct rtpp_set_head));
+		if(!rtpp_set_list){
+			LM_ERR("no shm memory left to create list of proxysets\n");
+			return -1;
+		}
+		memset(rtpp_set_list, 0, sizeof(struct rtpp_set_head));
+
+		rtpp_set_list->rset_head_lock = lock_alloc();
+		if (!rtpp_set_list->rset_head_lock) {
+			LM_ERR("no shm memory left to create list of proxysets lock\n");
+			return -1;
+		}
+
+		if (lock_init(rtpp_set_list->rset_head_lock) == 0) {
+			LM_ERR("could not init rtpproxy list of proxysets lock\n");
+			return -1;
+		}
+	}
 
 	if (rtpp_db_url.s == NULL)
 	{
@@ -1583,6 +1680,10 @@ mod_init(void)
 		}
 	}
 
+	/* any rtpproxy configured? */
+	if (rtpp_set_list)
+		default_rtpp_set = select_rtpp_set(DEFAULT_RTPP_SET_ID);
+
 	if (rtp_inst_pv_param.s) {
 		rtp_inst_pv_param.len = strlen(rtp_inst_pv_param.s);
 		rtp_inst_pvar = pv_cache_get(&rtp_inst_pv_param);
@@ -1681,11 +1782,8 @@ mod_init(void)
 	return 0;
 }
 
-
-static int
-child_init(int rank)
-{
-	int n;
+static int build_rtpp_socks(unsigned int current_rtpp_no) {
+	int n, i;
 	char *cp;
 	struct addrinfo hints, *res;
 	struct rtpp_set  *rtpp_list;
@@ -1694,22 +1792,26 @@ child_init(int rank)
 	int ip_mtu_discover = IP_PMTUDISC_DONT;
 #endif
 
-	if(rtpp_set_list==NULL )
-		return 0;
-
-	/* Iterate known RTP proxies - create sockets */
-	mypid = getpid();
+	// close current sockets
+	for (i = 0; i < rtpp_socks_size; i++) {
+		if (rtpp_socks[i] >= 0) {
+			close(rtpp_socks[i]);
+		}
+	}
 
-	rtpp_socks = (int*)pkg_malloc( sizeof(int)*rtpp_no );
-	if (rtpp_socks==NULL) {
-		LM_ERR("no more pkg memory\n");
+	rtpp_socks_size = current_rtpp_no;
+	rtpp_socks = (int*)pkg_realloc(rtpp_socks, sizeof(int)*(rtpp_socks_size));
+	if (!rtpp_socks) {
+		LM_ERR("no more pkg memory for rtpp_socks\n");
 		return -1;
 	}
 
-	for(rtpp_list = rtpp_set_list->rset_first; rtpp_list != 0;
-		rtpp_list = rtpp_list->rset_next){
+	lock_get(rtpp_set_list->rset_head_lock);
+	for (rtpp_list = rtpp_set_list->rset_first; rtpp_list != 0;
+		rtpp_list = rtpp_list->rset_next) {
 
-		for (pnode=rtpp_list->rn_first; pnode!=0; pnode = pnode->rn_next){
+		lock_get(rtpp_list->rset_lock);
+		for (pnode=rtpp_list->rn_first; pnode!=0; pnode = pnode->rn_next) {
 			char *hostname;
 
 			if (pnode->rn_umode == 0) {
@@ -1724,7 +1826,8 @@ child_init(int rank)
 			hostname = (char*)pkg_malloc(sizeof(char) * (strlen(pnode->rn_address) + 1));
 			if (hostname==NULL) {
 				LM_ERR("no more pkg memory\n");
-				return -1;
+				rtpp_socks[pnode->idx] = -1;
+				continue;
 			}
 			strcpy(hostname, pnode->rn_address);
 
@@ -1743,7 +1846,8 @@ child_init(int rank)
 			if ((n = getaddrinfo(hostname, cp, &hints, &res)) != 0) {
 				LM_ERR("%s\n", gai_strerror(n));
 				pkg_free(hostname);
-				return -1;
+				rtpp_socks[pnode->idx] = -1;
+				continue;
 			}
 			pkg_free(hostname);
 
@@ -1752,7 +1856,7 @@ child_init(int rank)
 			if (rtpp_socks[pnode->idx] == -1) {
 				LM_ERR("can't create socket\n");
 				freeaddrinfo(res);
-				return -1;
+				continue;
 			}
 
 #ifdef IP_MTU_DISCOVER
@@ -1766,7 +1870,7 @@ child_init(int rank)
 				close(rtpp_socks[pnode->idx]);
 				rtpp_socks[pnode->idx] = -1;
 				freeaddrinfo(res);
-				return -1;
+				continue;
 			}
 
 			if (connect(rtpp_socks[pnode->idx], res->ai_addr, res->ai_addrlen) == -1) {
@@ -1774,13 +1878,41 @@ child_init(int rank)
 				close(rtpp_socks[pnode->idx]);
 				rtpp_socks[pnode->idx] = -1;
 				freeaddrinfo(res);
-				return -1;
+				continue;
 			}
 
 			freeaddrinfo(res);
 rptest:
 			pnode->rn_disabled = rtpp_test(pnode, 0, 1);
 		}
+		lock_release(rtpp_list->rset_lock);
+	}
+	lock_release(rtpp_set_list->rset_head_lock);
+
+	return 0;
+}
+
+static int
+child_init(int rank)
+{
+	if(!rtpp_set_list)
+		return 0;
+
+	mypid = getpid();
+
+	lock_get(rtpp_no_lock);
+	rtpp_socks_size = *rtpp_no;
+	lock_release(rtpp_no_lock);
+
+	rtpp_socks = (int*)pkg_malloc(sizeof(int)*(rtpp_socks_size));
+	if (!rtpp_socks) {
+		LM_ERR("no more pkg memory for rtpp_socks\n");
+		return -1;
+	}
+
+	/* Iterate known RTP proxies - create sockets */
+	if (rtpp_socks_size) {
+		build_rtpp_socks(rtpp_socks_size);
 	}
 
 	return 0;
@@ -1796,11 +1928,39 @@ static void mod_destroy(void)
 	if (natping_state)
 		shm_free(natping_state);
 
-	if(rtpp_set_list == NULL)
+	if (rtpp_no) {
+		shm_free(rtpp_no);
+		rtpp_no = NULL;
+	}
+
+	if (rtpp_no_lock) {
+		lock_destroy(rtpp_no_lock);
+		lock_dealloc(rtpp_no_lock);
+		rtpp_no_lock = NULL;
+	}
+
+	if (!rtpp_set_list) {
+		return;
+	}
+
+	if (!rtpp_set_list->rset_head_lock) {
+		shm_free(rtpp_set_list);
+		rtpp_set_list = NULL;
 		return;
+	}
 
+	lock_get(rtpp_set_list->rset_head_lock);
 	for(crt_list = rtpp_set_list->rset_first; crt_list != NULL; ){
 
+		if (!crt_list->rset_lock) {
+			last_list = crt_list;
+			crt_list = last_list->rset_next;
+			shm_free(last_list);
+			last_list = NULL;
+			continue;
+		}
+
+		lock_get(crt_list->rset_lock);
 		for(crt_rtpp = crt_list->rn_first; crt_rtpp != NULL;  ){
 
 			if(crt_rtpp->rn_url.s)
@@ -1810,13 +1970,25 @@ static void mod_destroy(void)
 			crt_rtpp = last_rtpp->rn_next;
 			shm_free(last_rtpp);
 		}
-
 		last_list = crt_list;
 		crt_list = last_list->rset_next;
+		lock_release(crt_list->rset_lock);
+
+		lock_destroy(last_list->rset_lock);
+		lock_dealloc((void*)last_list->rset_lock);
+		last_list->rset_lock = NULL;
+
 		shm_free(last_list);
+		last_list = NULL;
 	}
+	lock_release(rtpp_set_list->rset_head_lock);
+
+	lock_destroy(rtpp_set_list->rset_head_lock);
+	lock_dealloc((void*)rtpp_set_list->rset_head_lock);
+	rtpp_set_list->rset_head_lock = NULL;
 
 	shm_free(rtpp_set_list);
+	rtpp_set_list = NULL;
 
 	/* destroy the hastable which keeps the call-id <-> selected_node relation */
 	if (!rtpengine_hash_table_destroy()) {
@@ -2437,8 +2609,15 @@ static struct rtpp_set * select_rtpp_set(int id_set ){
 	struct rtpp_set * rtpp_list;
 	/*is it a valid set_id?*/
 
-	if (!rtpp_set_list || !rtpp_set_list->rset_first) {
-		LM_ERR("no rtp_proxy configured\n");
+	if (!rtpp_set_list) {
+		LM_ERR("no rtpp_set_list\n");
+		return 0;
+	}
+
+	lock_get(rtpp_set_list->rset_head_lock);
+	if (!rtpp_set_list->rset_first) {
+		LM_ERR("no rtpp_set_list->rset_first\n");
+		lock_release(rtpp_set_list->rset_head_lock);
 		return 0;
 	}
 
@@ -2447,6 +2626,7 @@ static struct rtpp_set * select_rtpp_set(int id_set ){
 	if (!rtpp_list) {
 		LM_ERR(" script error-invalid id_set to be selected\n");
 	}
+	lock_release(rtpp_set_list->rset_head_lock);
 
 	return rtpp_list;
 }
@@ -2470,7 +2650,13 @@ select_rtpp_node_new(str callid, str viabranch, int do_test)
 retry:
 	weight_sum = 0;
 
+	lock_get(active_rtpp_set->rset_lock);
 	for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
+		/* Select only between displayed machines */
+		if (!node->rn_displayed) {
+			continue;
+		}
+
 		/* Try to enable if it's time to try. */
 		if (node->rn_disabled && node->rn_recheck_ticks <= get_ticks()){
 			node->rn_disabled = rtpp_test(node, 1, 0);
@@ -2481,6 +2667,7 @@ retry:
 			weight_sum += node->rn_weight;
 		}
 	}
+	lock_release(active_rtpp_set->rset_lock);
 
 	/* No proxies? Force all to be redetected, if not yet */
 	if (weight_sum == 0) {
@@ -2490,9 +2677,16 @@ retry:
 
 		was_forced = 1;
 
+		lock_get(active_rtpp_set->rset_lock);
 		for(node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
+			/* Select only between displayed machines */
+			if (!node->rn_displayed) {
+				continue;
+			}
+
 			node->rn_disabled = rtpp_test(node, 1, 1);
 		}
+		lock_release(active_rtpp_set->rset_lock);
 
 		goto retry;
 	}
@@ -2503,27 +2697,40 @@ retry:
 	/*
 	 * Scan proxy list and decrease until appropriate proxy is found.
 	 */
+	lock_get(active_rtpp_set->rset_lock);
 	for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
+		/* Select only between displayed machines */
+		if (!node->rn_displayed) {
+			continue;
+		}
+
 		/* Select only between enabled machines */
 		if (node->rn_disabled)
 			continue;
 
 		/* Found enabled machine */
-		if (sumcut < node->rn_weight)
+		if (sumcut < node->rn_weight) {
+			lock_release(active_rtpp_set->rset_lock);
 			goto found;
+		}
 
 		/* Update sumcut if enabled machine */
 		sumcut -= node->rn_weight;
 	}
+	lock_release(active_rtpp_set->rset_lock);
 
 	/* No node list */
 	return NULL;
 
 found:
 	if (do_test) {
+		lock_get(active_rtpp_set->rset_lock);
 		node->rn_disabled = rtpp_test(node, node->rn_disabled, 0);
-		if (node->rn_disabled)
+		if (node->rn_disabled) {
+			lock_release(active_rtpp_set->rset_lock);
 			goto retry;
+		}
+		lock_release(active_rtpp_set->rset_lock);
 	}
 
 	/* build the entry */
@@ -2601,8 +2808,22 @@ static struct rtpp_node *
 select_rtpp_node(str callid, str viabranch, int do_test)
 {
 	struct rtpp_node *node = NULL;
+	unsigned int current_rtpp_no;
+
+	lock_get(rtpp_no_lock);
+	current_rtpp_no = *rtpp_no;
+	lock_release(rtpp_no_lock);
+
+	if (rtpp_socks_size != current_rtpp_no) {
+		build_rtpp_socks(current_rtpp_no);
+	}
+
+	if (!active_rtpp_set) {
+		default_rtpp_set = select_rtpp_set(setid_default);
+		active_rtpp_set = default_rtpp_set;
+	}
 
-	if(!active_rtpp_set) {
+	if (!active_rtpp_set) {
 		LM_ERR("script error - no valid set selected\n");
 		return NULL;
 	}
@@ -2771,12 +2992,14 @@ set_rtpengine_set_n(struct sip_msg *msg, rtpp_set_link_t *rtpl, struct rtpp_set
 	}
 	current_msg_id = msg->id;
 
+	lock_get((*out)->rset_lock);
 	node = (*out)->rn_first;
 	while (node != NULL)
 	{
 		if (node->rn_disabled == 0) nb_active_nodes++;
 		node = node->rn_next;
 	}
+	lock_release((*out)->rset_lock);
 
 	if ( nb_active_nodes > 0 )
 	{

+ 2 - 0
modules/rtpengine/rtpengine.h

@@ -53,12 +53,14 @@ struct rtpp_set {
 	struct rtpp_node	*rn_first;
 	struct rtpp_node	*rn_last;
 	struct rtpp_set	 	*rset_next;
+	gen_lock_t		*rset_lock;
 };
 
 
 struct rtpp_set_head {
 	struct rtpp_set		*rset_first;
 	struct rtpp_set		*rset_last;
+	gen_lock_t		*rset_head_lock;
 };
 
 

+ 2 - 2
modules/rtpengine/rtpengine_db.c

@@ -91,6 +91,8 @@ static int rtpp_load_db(void)
 		return -1;
 	}
 
+	rtpengine_delete_node_all();
+
 	n_rows = RES_ROW_N(res);
 	rows = RES_ROWS(res);
 	if (n_rows == 0)
@@ -99,8 +101,6 @@ static int rtpp_load_db(void)
 		return 0;
 	}
 
-	rtpengine_delete_node_all();
-
 	for (i=0; i<n_rows; i++)
 	{
 		values = ROW_VALUES(rows + i);