Bläddra i källkod

Merge branch 'master' of https://github.com/smititelu/kamailio

Richard Fuchs 9 år sedan
förälder
incheckning
c73b9cd8b5

+ 122 - 3
modules/rtpengine/doc/rtpengine_admin.xml

@@ -73,6 +73,38 @@
 		If the set was selected using setid_avp, the avp needs to be
 		set only once before rtpengine_offer() or rtpengine_manage() call.
 	</para>
+	<para>
+		From the current implementation point of view, the sets of rtpproxy nodes
+		are shared memory(shm), so all processes can see a common list of nodes.
+		There is no locking when setting the nodes enabled/disabled (to keep the
+		memory access as fast as possible). Thus, problems related to node state
+		might appear for concurent processes that might set the nodes
+		enabled/disabled(e.g. by fifo command). This robustness problems are overcomed as follows.
+	</para>
+
+	<para>
+		If the current process sees the selected node as disabled, the node is
+		<emphasis>force tested</emphasis> before the current process actually
+		takes the disabled decision. If the test succeeds, the process will set
+		the node as enabled (but other concurrent process might still see it as disabled).
+.
+	</para>
+
+	<para>
+		If the current process sees the selected node as enabled, it does no additional checks
+		and sends the command which will fail in case the machine is actually broken.
+		The process will set the node as disabled (but other concurrent process might still see it as enabled).
+	</para>
+
+	<para>
+		The 'kamctl fifo' commands (including rtpengin ones) are executed by an exclusive
+		process which operate on the same shared memory node list.
+	</para>
+
+	<para>
+		All the nodes are pinged in the beginning by all the processes,
+		even if the node list is shared memory.
+	</para>
 	</section>
 
 	<section>
@@ -175,6 +207,30 @@ modparam("rtpengine", "rtpengine_disable_tout", 20)
 ...
 modparam("rtpengine", "rtpengine_tout_ms", 2000)
 ...
+</programlisting>
+		</example>
+	</section>
+	<section id="rtpengine.p.rtpengine_allow_op">
+		<title><varname>rtpengine_allow_op</varname> (integer)</title>
+		<para>
+		Enable this to allow finishing the current sessions while denying new sessions for the
+		<emphasis>manually deactivated nodes </emphasis> via kamctl command i.e. "disabled(permanent)" nodes.
+		Probably the manually deactivated machine is still running(did not crash).
+		</para>
+		<para>
+		This is <emphasis>useful</emphasis> when deactivating a node for maintanance and reject new sessions but allow current ones to finish.
+		</para>
+		<para>
+		<emphasis>
+		Default value is <quote>0</quote> to keep the current behaviour.
+		</emphasis>
+		</para>
+		<example>
+		<title>Set <varname>rtpengine_allow_op</varname> parameter</title>
+		<programlisting format="linespecific">
+...
+modparam("rtpengine", "rtpengine_allow_op", 1)
+...
 </programlisting>
 		</example>
 	</section>
@@ -305,7 +361,7 @@ modparam("rtpengine", "read_sdp_pv", "$var(sdp)")
 route {
 	...
 	$var(sdp) = $rb + "a=foo:bar\r\n";
-	rtpproxy_manage();
+	rtpengine_manage();
 }
 </programlisting>
 		</example>
@@ -330,7 +386,7 @@ modparam("rtpengine", "write_sdp_pv", "$avp(sdp)")
 ...
 route {
 	...
-	rtpproxy_manage();
+	rtpengine_manage();
 	set_body("$avp(sdp)a=baz123\r\n", "application/sdp");
 }
 </programlisting>
@@ -350,12 +406,58 @@ route {
 		<title>Set <varname>rtp_inst_pvar</varname> parameter</title>
 <programlisting format="linespecific">
 ...
-modparam("rtpproxy", "rtp_inst_pvar", "$avp(RTP_INSTANCE)")
+modparam("rtpengine", "rtp_inst_pvar", "$avp(RTP_INSTANCE)")
+...
+</programlisting>
+		</example>
+	</section>
+
+	<section id="rtpengine.p.hash_table_size">
+		<title><varname>hash_table_size</varname> (integer)</title>
+		<para>
+			Size of the hash table. Default value is 256.
+		</para>
+		<para>
+			NOTE: If configured size is <emphasis>less than</emphasis> 1, the size will be defaulted to 1.
+		</para>
+		<example>
+		<title>Set <varname>hash_table_size</varname> parameter</title>
+<programlisting format="linespecific">
+...
+modparam("rtpengine", "hash_table_size", "123")
 ...
 </programlisting>
 		</example>
 	</section>
 
+	<section id="rtpengine.p.hash_table_tout">
+		<title><varname>hash_table_tout</varname> (integer)</title>
+		<para>
+			Number of seconds after an rtpengine hash table entry is marked for deletion.
+			By default, this parameter is set to 3600 (seconds).
+		</para>
+		<para>
+			To maintain information about a selected rtp machine node, for a given call, entries are added in a hashtable of (callid, node) pairs.
+			When command comes, lookup callid. If found, return chosen node. If not found, choose a new node, insert it in the hastable and return the chosen node.
+		</para>
+		<para>
+			NOTE: In the current implementation, the actual deletion happens <emphasis>on the fly</emphasis>,
+			while insert/remove/lookup the hastable, <emphasis>only</emphasis> for the entries in the insert/remove/lookup path.
+		</para>
+		<para>
+			NOTE: When configuring this parameter, one should consider maximum call time VS share memory for unfinished calls.
+		</para>
+		<example>
+		<title>Set <varname>hash_table_tout</varname> parameter</title>
+<programlisting format="linespecific">
+...
+modparam("rtpengine", "hash_table_tout", "300")
+...
+</programlisting>
+		</example>
+	</section>
+
+
 	</section>
 
 	<section>
@@ -964,6 +1066,23 @@ $ &ctltool; fifo nh_ping_rtpp all
 			</programlisting>
 			</example>
 		</section>
+
+	    <section id="rtpengine.m.nh_show_hash_total">
+			<title><function moreinfo="none">nh_show_hash_total</function></title>
+			<para>
+				Print the total number of hash entries in the hash table at a given moment.
+			</para>
+			<example>
+			<title>
+				<function moreinfo="none">nh_show_hash_total</function> usage</title>
+			<programlisting format="linespecific">
+...
+$ &ctltool; fifo nh_show_hash_total
+...
+			</programlisting>
+			</example>
+		</section>
+
 	</section>
 
 </chapter>

+ 250 - 48
modules/rtpengine/rtpengine.c

@@ -78,6 +78,7 @@
 #include "../../modules/tm/tm_load.h"
 #include "rtpengine.h"
 #include "rtpengine_funcs.h"
+#include "rtpengine_hash.h"
 #include "bencode.h"
 
 MODULE_VERSION
@@ -108,6 +109,7 @@ MODULE_VERSION
 #define MI_ENABLE_RTP_PROXY			"nh_enable_rtpp"
 #define MI_SHOW_RTP_PROXIES			"nh_show_rtpp"
 #define MI_PING_RTP_PROXY           "nh_ping_rtpp"
+#define MI_SHOW_HASH_TOTAL          "nh_show_hash_total"
 
 #define MI_RTP_PROXY_NOT_FOUND		"RTP proxy not found"
 #define MI_RTP_PROXY_NOT_FOUND_LEN	(sizeof(MI_RTP_PROXY_NOT_FOUND)-1)
@@ -142,6 +144,10 @@ MODULE_VERSION
 #define MI_SUCCESS_LEN         		(sizeof(MI_SUCCESS)-1)
 #define MI_FAIL                     "fail"
 #define MI_FAIL_LEN         		(sizeof(MI_FAIL)-1)
+#define MI_HASH_ENTRIES				"entries"
+#define MI_HASH_ENTRIES_LEN			(sizeof(MI_HASH_ENTRIES)-1)
+#define MI_HASH_ENTRIES_FAIL		"Fail to get entry details"
+#define MI_HASH_ENTRIES_FAIL_LEN	(sizeof(MI_HASH_ENTRIES_FAIL)-1)
 
 #define MI_FOUND_ALL                   2
 #define MI_FOUND_ONE                   1
@@ -187,7 +193,9 @@ static int rtpengine_offer_answer(struct sip_msg *msg, const char *flags, int op
 static int fixup_set_id(void ** param, int param_no);
 static int set_rtpengine_set_f(struct sip_msg * msg, char * str1, char * str2);
 static struct rtpp_set * select_rtpp_set(int id_set);
-static struct rtpp_node *select_rtpp_node(str, int);
+static struct rtpp_node *select_rtpp_node_new(str, str, int);
+static struct rtpp_node *select_rtpp_node_old(str, str, int);
+static struct rtpp_node *select_rtpp_node(str, str, int);
 static char *send_rtpp_command(struct rtpp_node *, bencode_item_t *, int *);
 static int get_extra_id(struct sip_msg* msg, str *id_str);
 
@@ -210,17 +218,17 @@ static int rtpp_test_ping(struct rtpp_node *node);
 
 /* Pseudo-Variables */
 static int pv_get_rtpstat_f(struct sip_msg *, pv_param_t *, pv_value_t *);
+static int set_rtp_inst_pvar(struct sip_msg *msg, const str * const uri);
 
 /*mi commands*/
-static struct mi_root* mi_enable_rtp_proxy(struct mi_root* cmd_tree,
-		void* param );
-static struct mi_root* mi_show_rtp_proxy(struct mi_root* cmd_tree,
-		void* param);
-static struct mi_root* mi_ping_rtp_proxy(struct mi_root* cmd_tree,
-        void* param);
+static struct mi_root* mi_enable_rtp_proxy(struct mi_root* cmd_tree, void* param);
+static struct mi_root* mi_show_rtp_proxy(struct mi_root* cmd_tree, void* param);
+static struct mi_root* mi_ping_rtp_proxy(struct mi_root* cmd_tree, void* param);
+static struct mi_root* mi_show_hash_total(struct mi_root* cmd_tree, void* param);
 
 
 static int rtpengine_disable_tout = 60;
+static int rtpengine_allow_op = 0;
 static int rtpengine_retr = 5;
 static int rtpengine_tout_ms = 1000;
 static int queried_nodes_limit = MAX_RTPP_TRIED_NODES;
@@ -228,6 +236,8 @@ static pid_t mypid;
 static unsigned int myseqn = 0;
 static str extra_id_pv_param = {NULL, 0};
 static char *setid_avp_param = NULL;
+static int hash_table_tout = 3600;
+static int hash_table_size = 256;
 
 static char ** rtpp_strings=0;
 static int rtpp_sets=0; /*used in rtpengine_set_store()*/
@@ -326,6 +336,7 @@ static param_export_t params[] = {
 	{"rtpengine_disable_tout",INT_PARAM, &rtpengine_disable_tout },
 	{"rtpengine_retr",        INT_PARAM, &rtpengine_retr         },
 	{"rtpengine_tout_ms",     INT_PARAM, &rtpengine_tout_ms      },
+	{"rtpengine_allow_op",    INT_PARAM, &rtpengine_allow_op     },
 	{"queried_nodes_limit",   INT_PARAM, &queried_nodes_limit    },
 	{"db_url",                PARAM_STR, &rtpp_db_url },
 	{"table_name",            PARAM_STR, &rtpp_table_name },
@@ -336,6 +347,8 @@ static param_export_t params[] = {
 	{"rtp_inst_pvar",         PARAM_STR, &rtp_inst_pv_param },
 	{"write_sdp_pv",          PARAM_STR, &write_sdp_pvar_str          },
 	{"read_sdp_pv",           PARAM_STR, &read_sdp_pvar_str          },
+	{"hash_table_tout",       INT_PARAM, &hash_table_tout        },
+	{"hash_table_size",       INT_PARAM, &hash_table_size        },
 	{0, 0, 0}
 };
 
@@ -343,6 +356,7 @@ static mi_export_t mi_cmds[] = {
 	{MI_ENABLE_RTP_PROXY,     mi_enable_rtp_proxy,  0,  0,  0},
 	{MI_SHOW_RTP_PROXIES,     mi_show_rtp_proxy,    0,  0,  0},
 	{MI_PING_RTP_PROXY,       mi_ping_rtp_proxy,    0,  0,  0},
+	{MI_SHOW_HASH_TOTAL,      mi_show_hash_total,    0,  0,  0},
 	{ 0, 0, 0, 0, 0}
 };
 
@@ -1099,8 +1113,7 @@ error:
 	return -1;
 }
 
-static struct mi_root* mi_show_rtp_proxy(struct mi_root* cmd_tree,
-												void* param)
+static struct mi_root* mi_show_rtp_proxy(struct mi_root* cmd_tree, void* param)
 {
 	struct mi_node *node;
 	struct mi_root *root = NULL;
@@ -1189,8 +1202,7 @@ error:
 	return init_mi_tree(404, MI_ERROR, MI_ERROR_LEN);
 }
 
-static struct mi_root* mi_ping_rtp_proxy(struct mi_root* cmd_tree,
-												void* param)
+static struct mi_root* mi_ping_rtp_proxy(struct mi_root* cmd_tree, void* param)
 {
 	struct mi_node *node, *crt_node;
 	struct mi_attr *attr;
@@ -1315,6 +1327,48 @@ error:
 }
 
 
+static struct mi_root* mi_show_hash_total(struct mi_root* cmd_tree, void* param)
+{
+	struct mi_node *node, *crt_node;
+	struct mi_attr *attr;
+	struct mi_root *root = NULL;
+	unsigned int total;
+	str total_str;
+
+	// Init print tree
+	root = init_mi_tree(200, MI_OK_S, MI_OK_LEN);
+	if (!root) {
+		LM_ERR("the MI tree cannot be initialized!\n");
+		return 0;
+	}
+	node = &root->node;
+
+	// Create new node and add it to the roots's kids
+	if(!(crt_node = add_mi_node_child(node, MI_DUP_NAME, "total", strlen("total"), 0, 0))) {
+		LM_ERR("cannot add the child node to the tree\n");
+		goto error;
+	}
+
+	// Get total number of entries
+	total = rtpengine_hash_table_total();
+	total_str.s = int2str(total, &total_str.len);
+
+	// Add node attributes
+	if ((attr = add_mi_attr(crt_node, MI_DUP_VALUE, MI_HASH_ENTRIES, MI_HASH_ENTRIES_LEN, total_str.s, total_str.len)) == 0) {
+		LM_ERR("cannot add attributes to the node\n");
+		goto error;
+	}
+
+	return root;
+
+error:
+	if (root) {
+	    free_mi_tree(root);
+	}
+
+	return init_mi_tree(404, MI_HASH_ENTRIES_FAIL, MI_HASH_ENTRIES_FAIL_LEN);
+}
+
 
 static int
 mod_init(void)
@@ -1440,6 +1494,14 @@ mod_init(void)
 		return -1;
 	}
 
+	/* init the hastable which keeps the call-id <-> selected_node relation */
+	if (!rtpengine_hash_table_init(hash_table_size)) {
+		LM_ERR("rtpengine_hash_table_init(%d) failed!\n", hash_table_size);
+		return -1;
+	} else {
+		LM_DBG("rtpengine_hash_table_init(%d) success!\n", hash_table_size);
+	}
+
 	return 0;
 }
 
@@ -1579,6 +1641,13 @@ static void mod_destroy(void)
 	}
 
 	shm_free(rtpp_set_list);
+
+	/* destroy the hastable which keeps the call-id <-> selected_node relation */
+	if (!rtpengine_hash_table_destroy()) {
+		LM_ERR("rtpengine_hash_table_destroy() failed!\n");
+	} else {
+		LM_DBG("rtpengine_hash_table_destroy() success!\n");
+	}
 }
 
 
@@ -1790,7 +1859,8 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_
 {
 	struct ng_flags_parse ng_flags;
 	bencode_item_t *item, *resp;
-	str callid, from_tag, to_tag, body, viabranch, error;
+	str callid = STR_NULL, from_tag = STR_NULL, to_tag = STR_NULL, viabranch = STR_NULL;
+	str body = STR_NULL, error = STR_NULL;
 	int ret, queried_nodes;
 	struct rtpp_node *node;
 	char *cp;
@@ -1923,16 +1993,17 @@ select_node:
 			LM_ERR("queried nodes limit reached\n");
 			goto error;
 		}
-		node = select_rtpp_node(callid, 1);
+		node = select_rtpp_node(callid, viabranch, 1);
 		if (!node) {
 			LM_ERR("no available proxies\n");
 			goto error;
 		}
+
 		cp = send_rtpp_command(node, ng_flags.dict, &ret);
-        if (cp == NULL) {
-    	    node->rn_disabled = 1;
-        	node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout;
-        }
+		if (cp == NULL) {
+			node->rn_disabled = 1;
+			node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout;
+		}
 	} while (cp == NULL);
 	LM_DBG("proxy reply: %.*s\n", ret, cp);
 
@@ -1944,12 +2015,13 @@ select_node:
 		LM_ERR("failed to decode bencoded reply from proxy: %.*s\n", ret, cp);
 		goto error;
 	}
+
 	if (!bencode_dictionary_get_strcmp(resp, "result", "error")) {
 		if (!bencode_dictionary_get_str(resp, "error-reason", &error)) {
 		    LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp);
 		}
 		else {
-			if ((RTPENGINE_SESS_LIMIT_MSG_LEN == error.len) && 
+			if ((RTPENGINE_SESS_LIMIT_MSG_LEN == error.len) &&
 			    (strncmp(error.s, RTPENGINE_SESS_LIMIT_MSG, RTPENGINE_SESS_LIMIT_MSG_LEN) == 0))
 			{
 				LM_WARN("proxy %.*s: %.*s", node->rn_url.len, node->rn_url.s , error.len, error.s);
@@ -1957,12 +2029,23 @@ select_node:
 			}
 			LM_ERR("proxy replied with error: %.*s\n", error.len, error.s);
 		}
-        goto error;
+		goto error;
 	}
 
 	if (body_out)
 		*body_out = body;
 
+	if (op == OP_DELETE) {
+		/* Delete the key<->value from the hashtable */
+		if (!rtpengine_hash_table_remove(callid, viabranch)) {
+			LM_ERR("rtpengine hash table failed to remove entry for callen=%d callid=%.*s viabranch=%.*s\n",
+				callid.len, callid.len, callid.s, viabranch.len, viabranch.s);
+		} else {
+			LM_DBG("rtpengine hash table remove entry for callen=%d callid=%.*s viabranch=%.*s\n",
+				callid.len, callid.len, callid.s, viabranch.len, viabranch.s);
+		}
+	}
+
 	return resp;
 
 error:
@@ -2191,81 +2274,199 @@ static struct rtpp_set * select_rtpp_set(int id_set ){
 
 	return rtpp_list;
 }
+
 /*
- * Main balancing routine. This does not try to keep the same proxy for
- * the call if some proxies were disabled or enabled; proxy death considered
- * too rare. Otherwise we should implement "mature" HA clustering, which is
- * too expensive here.
+ * run the selection algorithm and return the new selected node
  */
 static struct rtpp_node *
-select_rtpp_node(str callid, int do_test)
+select_rtpp_node_new(str callid, str viabranch, int do_test)
 {
-	unsigned sum, sumcut, weight_sum;
 	struct rtpp_node* node;
-	int was_forced;
-
-	if(!active_rtpp_set){
-		LM_ERR("script error -no valid set selected\n");
-		return NULL;
-	}
-	/* Most popular case: 1 proxy, nothing to calculate */
-	if (active_rtpp_set->rtpp_node_count == 1) {
-		node = active_rtpp_set->rn_first;
-		if (node->rn_disabled && node->rn_recheck_ticks <= get_ticks())
-			node->rn_disabled = rtpp_test(node, 1, 0);
-		return node->rn_disabled ? NULL : node;
-	}
+	unsigned i, sum, sumcut, weight_sum;
+	int was_forced = 0;
 
 	/* XXX Use quick-and-dirty hashing algo */
-	for(sum = 0; callid.len > 0; callid.len--)
-		sum += callid.s[callid.len - 1];
+	sum = 0;
+	for(i = 0; i < callid.len; i++)
+		sum += callid.s[i];
 	sum &= 0xff;
 
-	was_forced = 0;
 retry:
 	weight_sum = 0;
-	for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
 
+	for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
+		/* Try to enable if it's time to try. */
 		if (node->rn_disabled && node->rn_recheck_ticks <= get_ticks()){
-			/* Try to enable if it's time to try. */
 			node->rn_disabled = rtpp_test(node, 1, 0);
 		}
-		if (!node->rn_disabled)
+
+		/* Select only between enabled machines */
+		if (!node->rn_disabled) {
 			weight_sum += node->rn_weight;
+		}
 	}
+
+	/* No proxies? Force all to be redetected, if not yet */
 	if (weight_sum == 0) {
-		/* No proxies? Force all to be redetected, if not yet */
-		if (was_forced)
+		if (was_forced) {
 			return NULL;
+		}
+
 		was_forced = 1;
+
 		for(node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
 			node->rn_disabled = rtpp_test(node, 1, 1);
 		}
+
 		goto retry;
 	}
+
+	/* sumcut here lays from 0 to weight_sum-1 */
 	sumcut = sum % weight_sum;
+
 	/*
-	 * sumcut here lays from 0 to weight_sum-1.
 	 * Scan proxy list and decrease until appropriate proxy is found.
 	 */
 	for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
+		/* Select only between enabled machines */
 		if (node->rn_disabled)
 			continue;
+
+		/* Found enabled machine */
 		if (sumcut < node->rn_weight)
 			goto found;
+
+		/* Update sumcut if enabled machine */
 		sumcut -= node->rn_weight;
 	}
+
 	/* No node list */
 	return NULL;
+
 found:
 	if (do_test) {
 		node->rn_disabled = rtpp_test(node, node->rn_disabled, 0);
 		if (node->rn_disabled)
 			goto retry;
 	}
+
+	/* build the entry */
+	struct rtpengine_hash_entry *entry = shm_malloc(sizeof(struct rtpengine_hash_entry));
+	if (!entry) {
+		LM_ERR("rtpengine hash table fail to create entry for calllen=%d callid=%.*s viabranch=%.*s\n",
+			callid.len, callid.len, callid.s, viabranch.len, viabranch.s);
+		return node;
+	}
+        memset(entry, 0, sizeof(struct rtpengine_hash_entry));
+
+	/* fill the entry */
+        if (callid.s && callid.len > 0) {
+		if (shm_str_dup(&entry->callid, &callid) < 0) {
+			LM_ERR("rtpengine hash table fail to duplicate calllen=%d callid=%.*s\n",
+				callid.len, callid.len, callid.s);
+			rtpengine_hash_table_free_entry(entry);
+			return node;
+		}
+	}
+        if (viabranch.s && viabranch.len > 0) {
+		if (shm_str_dup(&entry->viabranch, &viabranch) < 0) {
+			LM_ERR("rtpengine hash table fail to duplicate calllen=%d viabranch=%.*s\n",
+				callid.len, viabranch.len, viabranch.s);
+			rtpengine_hash_table_free_entry(entry);
+			return node;
+		}
+	}
+	entry->node = node;
+	entry->next = NULL;
+	entry->tout = get_ticks() + hash_table_tout;
+
+	/* insert the key<->entry from the hashtable */
+	if (!rtpengine_hash_table_insert(callid, viabranch, entry)) {
+		LM_ERR("rtpengine hash table fail to insert node=%.*s for calllen=%d callid=%.*s viabranch=%.*s\n",
+			node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s, viabranch.len, viabranch.s);
+		rtpengine_hash_table_free_entry(entry);
+		return node;
+	} else {
+		LM_DBG("rtpengine hash table insert node=%.*s for calllen=%d callid=%.*s viabranch=%.*s\n",
+			node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s, viabranch.len, viabranch.s);
+	}
+
+	/* return selected node */
 	return node;
 }
 
+/*
+ * lookup the hastable (key=callid value=node) and get the old node (e.g. for answer/delete)
+ */
+static struct rtpp_node *
+select_rtpp_node_old(str callid, str viabranch, int do_test)
+{
+	struct rtpp_node *node = NULL;
+
+	node = rtpengine_hash_table_lookup(callid, viabranch);
+
+	if (!node) {
+		LM_NOTICE("rtpengine hash table lookup failed to find node for calllen=%d callid=%.*s viabranch=%.*s\n",
+			callid.len, callid.len, callid.s, viabranch.len, viabranch.s);
+		return NULL;
+	} else {
+		LM_DBG("rtpengine hash table lookup find node=%.*s for calllen=%d callid=%.*s viabranch=%.*s\n",
+			node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s, viabranch.len, viabranch.s);
+	}
+
+	return node;
+}
+
+/*
+ * Main balancing routine. This DO try to keep the same proxy for
+ * the call if some proxies were disabled or enabled (e.g. kamctl command)
+ */
+static struct rtpp_node *
+select_rtpp_node(str callid, str viabranch, int do_test)
+{
+	struct rtpp_node *node = NULL;
+
+	if(!active_rtpp_set) {
+		LM_ERR("script error - no valid set selected\n");
+		return NULL;
+	}
+
+	// lookup node
+	node = select_rtpp_node_old(callid, viabranch, do_test);
+
+	// check node
+	if (!node) {
+		// run the selection algorithm
+		node = select_rtpp_node_new(callid, viabranch, do_test);
+
+		// check node
+		if (!node) {
+			LM_ERR("rtpengine failed to select new for calllen=%d callid=%.*s\n",
+				callid.len, callid.len, callid.s);
+			return NULL;
+		}
+	}
+
+	// if node enabled, return it
+	if (!node->rn_disabled) {
+		return node;
+	}
+
+	// if proper configuration and node manually or timeout disabled, return it
+	if (rtpengine_allow_op) {
+		if (node->rn_recheck_ticks == MI_MAX_RECHECK_TICKS) {
+			LM_DBG("node=%.*s for calllen=%d callid=%.*s is disabled(permanent) (probably still UP)! Return it\n",
+				node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s);
+		} else {
+			LM_DBG("node=%.*s for calllen=%d callid=%.*s is disabled, either broke or timeout disabled! Return it\n",
+				node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s);
+		}
+		return node;
+	}
+
+	return NULL;
+}
+
 static int
 get_extra_id(struct sip_msg* msg, str *id_str) {
 	if(msg==NULL || extra_id_pv==NULL || id_str==NULL) {
@@ -2682,7 +2883,8 @@ pv_get_rtpstat_f(struct sip_msg *msg, pv_param_t *param,
 	return rtpengine_rtpp_set_wrap(msg, rtpengine_rtpstat_wrap, parms, 1);
 }
 
-int set_rtp_inst_pvar(struct sip_msg *msg, const str * const uri) {
+static int
+set_rtp_inst_pvar(struct sip_msg *msg, const str * const uri) {
 	pv_value_t val;
 
 	if (rtp_inst_pvar == NULL)

+ 0 - 1
modules/rtpengine/rtpengine.h

@@ -61,7 +61,6 @@ struct rtpp_set_head{
 struct rtpp_set *get_rtpp_set(int set_id);
 int add_rtpengine_socks(struct rtpp_set * rtpp_list, char * rtpproxy);
 
-int set_rtp_inst_pvar(struct sip_msg *msg, const str * const uri);
 
 int init_rtpproxy_db(void);
 

+ 544 - 0
modules/rtpengine/rtpengine_hash.c

@@ -0,0 +1,544 @@
+#include "rtpengine_hash.h"
+
+#include "../../str.h"
+#include "../../dprint.h"
+#include "../../mem/shm_mem.h"
+#include "../../locking.h"
+#include "../../timer.h"
+
+static void rtpengine_hash_table_free_row_lock(gen_lock_t *row_lock);
+
+
+static struct rtpengine_hash_table *rtpengine_hash_table;
+
+/* from sipwise rtpengine */
+static int str_cmp_str(const str a, const str b) {
+	if (a.len < b.len)
+		return -1;
+	if (a.len > b.len)
+		return 1;
+	if (a.len == 0 && b.len == 0)
+		return 0;
+	return memcmp(a.s, b.s, a.len);
+}
+
+/* from sipwise rtpengine */
+static int str_equal(str a, str b) {
+	return (str_cmp_str(a, b) == 0);
+}
+
+/* from sipwise rtpengine */
+static unsigned int str_hash(str s) {
+	unsigned int ret = 5381;
+	str it = s;
+
+	while (it.len > 0) {
+		ret = (ret << 5) + ret + *it.s;
+		it.s++;
+		it.len--;
+	}
+
+	return ret % rtpengine_hash_table->size;
+}
+
+/* rtpengine hash API */
+int rtpengine_hash_table_init(int size) {
+	int i;
+	int hash_table_size;
+
+	// init hash table size
+	if (size < 1) {
+		hash_table_size = 1;
+	} else {
+		hash_table_size = size;
+	}
+	LM_DBG("rtpengine_hash_table size = %d\n", hash_table_size);
+
+	// init hashtable
+	rtpengine_hash_table = shm_malloc(sizeof(struct rtpengine_hash_table));
+	if (!rtpengine_hash_table) {
+		LM_ERR("no shm left to create rtpengine_hash_table\n");
+		return 0;
+	}
+	memset(rtpengine_hash_table, 0, sizeof(struct rtpengine_hash_table));
+	rtpengine_hash_table->size = hash_table_size;
+
+	// init hashtable row_locks
+	rtpengine_hash_table->row_locks = shm_malloc(hash_table_size * sizeof(gen_lock_t*));
+	if (!rtpengine_hash_table->row_locks) {
+		LM_ERR("no shm left to create rtpengine_hash_table->row_locks\n");
+		rtpengine_hash_table_destroy();
+		return 0;
+	}
+	memset(rtpengine_hash_table->row_locks, 0, hash_table_size * sizeof(gen_lock_t*));
+
+	// init hashtable row_entry_list
+	rtpengine_hash_table->row_entry_list = shm_malloc(rtpengine_hash_table->size * sizeof(struct rtpengine_hash_entry*));
+	if (!rtpengine_hash_table->row_entry_list) {
+		LM_ERR("no shm left to create rtpengine_hash_table->row_entry_list\n");
+		rtpengine_hash_table_destroy();
+		return 0;
+	}
+	memset(rtpengine_hash_table->row_entry_list, 0, rtpengine_hash_table->size * sizeof(struct rtpengine_hash_entry*));
+
+	// init hashtable row_totals
+	rtpengine_hash_table->row_totals = shm_malloc(hash_table_size * sizeof(unsigned int));
+	if (!rtpengine_hash_table->row_totals) {
+		LM_ERR("no shm left to create rtpengine_hash_table->row_totals\n");
+		rtpengine_hash_table_destroy();
+		return 0;
+	}
+	memset(rtpengine_hash_table->row_totals, 0, hash_table_size * sizeof(unsigned int));
+
+	// init hashtable  row_locks[i], row_entry_list[i] and row_totals[i]
+	for (i = 0; i < hash_table_size; i++) {
+		// init hashtable row_locks[i]
+		rtpengine_hash_table->row_locks[i] = lock_alloc();
+		if (!rtpengine_hash_table->row_locks[i]) {
+			LM_ERR("no shm left to create rtpengine_hash_table->row_locks[%d]\n", i);
+			rtpengine_hash_table_destroy();
+			return 0;
+		}
+
+		// init hashtable row_entry_list[i]
+		rtpengine_hash_table->row_entry_list[i] = shm_malloc(sizeof(struct rtpengine_hash_entry));
+		if (!rtpengine_hash_table->row_entry_list[i]) {
+			LM_ERR("no shm left to create rtpengine_hash_table->row_entry_list[%d]\n", i);
+			rtpengine_hash_table_destroy();
+			return 0;
+		}
+		memset(rtpengine_hash_table->row_entry_list[i], 0, sizeof(struct rtpengine_hash_entry));
+
+		rtpengine_hash_table->row_entry_list[i]->tout = -1;
+		rtpengine_hash_table->row_entry_list[i]->next = NULL;
+
+		// init hashtable row_totals[i]
+		rtpengine_hash_table->row_totals[i] = 0;
+	}
+
+	return 1;
+}
+
+int rtpengine_hash_table_destroy() {
+	int i;
+
+	// check rtpengine hashtable
+	if (!rtpengine_hash_table) {
+		LM_ERR("NULL rtpengine_hash_table\n");
+		return 1;
+	}
+
+	// check rtpengine hashtable->row_locks
+	if (!rtpengine_hash_table->row_locks) {
+		LM_ERR("NULL rtpengine_hash_table->row_locks\n");
+		shm_free(rtpengine_hash_table);
+		rtpengine_hash_table = NULL;
+		return 1;
+	}
+
+	// destroy hashtable content
+	for (i = 0; i < rtpengine_hash_table->size; i++) {
+		// lock
+		if (!rtpengine_hash_table->row_locks[i]) {
+			LM_ERR("NULL rtpengine_hash_table->row_locks[%d]\n", i);
+			continue;
+		} else {
+			lock_get(rtpengine_hash_table->row_locks[i]);
+		}
+
+		// check rtpengine hashtable->row_entry_list
+		if (!rtpengine_hash_table->row_entry_list) {
+			LM_ERR("NULL rtpengine_hash_table->row_entry_list\n");
+		} else {
+			// destroy hashtable row_entry_list[i]
+			rtpengine_hash_table_free_row_entry_list(rtpengine_hash_table->row_entry_list[i]);
+			rtpengine_hash_table->row_entry_list[i] = NULL;
+		}
+
+		// unlock
+		lock_release(rtpengine_hash_table->row_locks[i]);
+
+		// destroy hashtable row_locks[i]
+		rtpengine_hash_table_free_row_lock(rtpengine_hash_table->row_locks[i]);
+		rtpengine_hash_table->row_locks[i] = NULL;
+	}
+
+	// destroy hashtable row_entry_list
+	if (!rtpengine_hash_table->row_entry_list) {
+		LM_ERR("NULL rtpengine_hash_table->row_entry_list\n");
+	} else {
+		shm_free(rtpengine_hash_table->row_entry_list);
+		rtpengine_hash_table->row_entry_list = NULL;
+	}
+
+	// destroy hashtable row_totals
+	if (!rtpengine_hash_table->row_totals) {
+		LM_ERR("NULL rtpengine_hash_table->row_totals\n");
+	} else {
+		shm_free(rtpengine_hash_table->row_totals);
+		rtpengine_hash_table->row_totals = NULL;
+	}
+
+	// destroy hashtable row_locks
+	if (!rtpengine_hash_table->row_locks) {
+		// should not be the case; just for code symmetry
+		LM_ERR("NULL rtpengine_hash_table->row_locks\n");
+	} else {
+		shm_free(rtpengine_hash_table->row_locks);
+		rtpengine_hash_table->row_locks = NULL;
+	}
+
+	// destroy hashtable
+	if (!rtpengine_hash_table) {
+		// should not be the case; just for code symmetry
+		LM_ERR("NULL rtpengine_hash_table\n");
+	} else {
+		shm_free(rtpengine_hash_table);
+		rtpengine_hash_table = NULL;
+	}
+
+	return 1;
+}
+
+int rtpengine_hash_table_insert(str callid, str viabranch, struct rtpengine_hash_entry *value) {
+	struct rtpengine_hash_entry *entry, *last_entry;
+	struct rtpengine_hash_entry *new_entry = (struct rtpengine_hash_entry *) value;
+	unsigned int hash_index;
+
+	// sanity checks
+	if (!rtpengine_hash_table_sanity_checks()) {
+		LM_ERR("sanity checks failed\n");
+		return 0;
+	}
+
+	// get entry list
+	hash_index = str_hash(callid);
+	entry = rtpengine_hash_table->row_entry_list[hash_index];
+	last_entry = entry;
+
+	// lock
+	if (rtpengine_hash_table->row_locks[hash_index]) {
+		lock_get(rtpengine_hash_table->row_locks[hash_index]);
+	} else {
+		LM_ERR("NULL rtpengine_hash_table->row_locks[%d]\n", hash_index);
+		return 0;
+	}
+
+	while (entry) {
+		// if found, don't add new entry
+		if (str_equal(entry->callid, new_entry->callid) &&
+		    str_equal(entry->viabranch, new_entry->viabranch)) {
+			// unlock
+			lock_release(rtpengine_hash_table->row_locks[hash_index]);
+			LM_NOTICE("callid=%.*s, viabranch=%.*s already in hashtable, ignore new value\n",
+				entry->callid.len, entry->callid.s,
+				entry->viabranch.len, entry->viabranch.s);
+			return 0;
+		}
+
+		// if expired entry discovered, delete it
+		if (entry->tout < get_ticks()) {
+			// set pointers; exclude entry
+			last_entry->next = entry->next;
+
+			// free current entry; entry points to unknown
+			rtpengine_hash_table_free_entry(entry);
+
+			// set pointers
+			entry = last_entry;
+
+			// update total
+			rtpengine_hash_table->row_totals[hash_index]--;
+		}
+
+		// next entry in the list
+		last_entry = entry;
+		entry = entry->next;
+	}
+
+	last_entry->next = new_entry;
+
+	// update total
+	rtpengine_hash_table->row_totals[hash_index]++;
+
+	// unlock
+	lock_release(rtpengine_hash_table->row_locks[hash_index]);
+
+	return 1;
+}
+
+int rtpengine_hash_table_remove(str callid, str viabranch) {
+	struct rtpengine_hash_entry *entry, *last_entry;
+	unsigned int hash_index;
+
+	// sanity checks
+	if (!rtpengine_hash_table_sanity_checks()) {
+		LM_ERR("sanity checks failed\n");
+		return 0;
+	}
+
+	// get first entry from entry list; jump over unused list head
+	hash_index = str_hash(callid);
+	entry = rtpengine_hash_table->row_entry_list[hash_index];
+	last_entry = entry;
+
+	// lock
+	if (rtpengine_hash_table->row_locks[hash_index]) {
+		lock_get(rtpengine_hash_table->row_locks[hash_index]);
+	} else {
+		LM_ERR("NULL rtpengine_hash_table->row_locks[%d]\n", hash_index);
+		return 0;
+	}
+
+	while (entry) {
+		// if callid found, delete entry
+		if (str_equal(entry->callid, callid) &&
+		    str_equal(entry->viabranch, viabranch)) {
+			// free entry
+			last_entry->next = entry->next;
+			rtpengine_hash_table_free_entry(entry);
+
+			// update total
+			rtpengine_hash_table->row_totals[hash_index]--;
+
+			// unlock
+			lock_release(rtpengine_hash_table->row_locks[hash_index]);
+
+			return 1;
+		}
+
+		// if expired entry discovered, delete it
+		if (entry->tout < get_ticks()) {
+			// set pointers; exclude entry
+			last_entry->next = entry->next;
+
+			// free current entry; entry points to unknown
+			rtpengine_hash_table_free_entry(entry);
+
+			// set pointers
+			entry = last_entry;
+
+			// update total
+			rtpengine_hash_table->row_totals[hash_index]--;
+		}
+
+		last_entry = entry;
+		entry = entry->next;
+	}
+
+	// unlock
+	lock_release(rtpengine_hash_table->row_locks[hash_index]);
+
+	return 0;
+}
+
+struct rtpp_node *rtpengine_hash_table_lookup(str callid, str viabranch) {
+	struct rtpengine_hash_entry *entry, *last_entry;
+	unsigned int hash_index;
+	struct rtpp_node *node;
+
+	// sanity checks
+	if (!rtpengine_hash_table_sanity_checks()) {
+		LM_ERR("sanity checks failed\n");
+		return 0;
+	}
+
+	// get first entry from entry list; jump over unused list head
+	hash_index = str_hash(callid);
+	entry = rtpengine_hash_table->row_entry_list[hash_index];
+	last_entry = entry;
+
+	// lock
+	if (rtpengine_hash_table->row_locks[hash_index]) {
+		lock_get(rtpengine_hash_table->row_locks[hash_index]);
+	} else {
+		LM_ERR("NULL rtpengine_hash_table->row_locks[%d]\n", hash_index);
+		return 0;
+	}
+
+	while (entry) {
+		// if callid found, return entry
+		if (str_equal(entry->callid, callid) &&
+		    str_equal(entry->viabranch, viabranch)) {
+			node = entry->node;
+
+			// unlock
+			lock_release(rtpengine_hash_table->row_locks[hash_index]);
+
+			return node;
+		}
+
+		// if expired entry discovered, delete it
+		if (entry->tout < get_ticks()) {
+			// set pointers; exclude entry
+			last_entry->next = entry->next;
+
+			// free current entry; entry points to unknown
+			rtpengine_hash_table_free_entry(entry);
+
+			// set pointers
+			entry = last_entry;
+
+			// update total
+			rtpengine_hash_table->row_totals[hash_index]--;
+		}
+
+		last_entry = entry;
+		entry = entry->next;
+	}
+
+	// unlock
+	lock_release(rtpengine_hash_table->row_locks[hash_index]);
+
+	return NULL;
+}
+
+// print hash table entries while deleting expired entries
+void rtpengine_hash_table_print() {
+	int i;
+	struct rtpengine_hash_entry *entry, *last_entry;
+
+	// sanity checks
+	if (!rtpengine_hash_table_sanity_checks()) {
+		LM_ERR("sanity checks failed\n");
+		return ;
+	}
+
+	// print hashtable
+	for (i = 0; i < rtpengine_hash_table->size; i++) {
+		// lock
+		if (rtpengine_hash_table->row_locks[i]) {
+			lock_get(rtpengine_hash_table->row_locks[i]);
+		} else {
+			LM_ERR("NULL rtpengine_hash_table->row_locks[%d]\n", i);
+			return ;
+		}
+
+		entry = rtpengine_hash_table->row_entry_list[i];
+		last_entry = entry;
+
+		while (entry) {
+			// if expired entry discovered, delete it
+			if (entry->tout < get_ticks()) {
+				// set pointers; exclude entry
+				last_entry->next = entry->next;
+
+				// free current entry; entry points to unknown
+				rtpengine_hash_table_free_entry(entry);
+
+				// set pointers
+				entry = last_entry;
+
+				// update total
+				rtpengine_hash_table->row_totals[i]--;
+			} else {
+				LM_DBG("hash_index=%d callid=%.*s tout=%u\n",
+					i, entry->callid.len, entry->callid.s, entry->tout - get_ticks());
+			}
+
+			last_entry = entry;
+			entry = entry->next;
+		}
+
+		// unlock
+		lock_release(rtpengine_hash_table->row_locks[i]);
+	}
+
+}
+
+unsigned int rtpengine_hash_table_total() {
+	int i;
+	unsigned int total = 0;
+
+	// sanity checks
+	if (!rtpengine_hash_table_sanity_checks()) {
+		LM_ERR("sanity checks failed\n");
+		return 0;
+	}
+
+	for (i = 0; i < rtpengine_hash_table->size; i++) {
+		total += rtpengine_hash_table->row_totals[i];
+	}
+
+	return total;
+}
+
+void rtpengine_hash_table_free_entry(struct rtpengine_hash_entry *entry) {
+	if (!entry) {
+		LM_ERR("try to free a NULL entry\n");
+		return ;
+	}
+
+	// free callid
+	if (entry->callid.s) {
+		shm_free(entry->callid.s);
+	}
+
+	// free viabranch
+	if (entry->viabranch.s) {
+		shm_free(entry->viabranch.s);
+	}
+
+	// free entry
+	shm_free(entry);
+
+	return ;
+}
+
+void rtpengine_hash_table_free_row_entry_list(struct rtpengine_hash_entry *row_entry_list) {
+	struct rtpengine_hash_entry *entry, *last_entry;
+
+	if (!row_entry_list) {
+		LM_ERR("try to free a NULL row_entry_list\n");
+		return ;
+	}
+
+	entry = row_entry_list;
+	while (entry) {
+		last_entry = entry;
+		entry = entry->next;
+		rtpengine_hash_table_free_entry(last_entry);
+		last_entry = NULL;
+	}
+
+	return ;
+}
+
+static void rtpengine_hash_table_free_row_lock(gen_lock_t *row_lock) {
+	if (!row_lock) {
+		LM_ERR("try to free a NULL lock\n");
+		return ;
+	}
+
+	lock_destroy(row_lock);
+
+	return ;
+}
+
+int rtpengine_hash_table_sanity_checks() {
+	// check rtpengine hashtable
+	if (!rtpengine_hash_table) {
+		LM_ERR("NULL rtpengine_hash_table\n");
+		return 0;
+	}
+
+	// check rtpengine hashtable->row_locks
+	if (!rtpengine_hash_table->row_locks) {
+		LM_ERR("NULL rtpengine_hash_table->row_locks\n");
+		return 0;
+	}
+
+	// check rtpengine hashtable->row_entry_list
+	if (!rtpengine_hash_table->row_entry_list) {
+		LM_ERR("NULL rtpengine_hash_table->row_entry_list\n");
+		return 0;
+	}
+
+	// check rtpengine hashtable->row_totals
+	if (!rtpengine_hash_table->row_totals) {
+		LM_ERR("NULL rtpengine_hash_table->row_totals\n");
+		return 0;
+	}
+
+	return 1;
+}

+ 40 - 0
modules/rtpengine/rtpengine_hash.h

@@ -0,0 +1,40 @@
+#ifndef _RTPENGINE_HASH_H
+#define _RTPENGINE_HASH_H
+
+#include "../../str.h"
+#include "../../locking.h"
+
+
+/* table entry */
+struct rtpengine_hash_entry {
+	str callid;				// call callid
+	str viabranch;				// call viabranch
+	struct rtpp_node *node;			// call selected node
+
+	unsigned int tout;			// call timeout
+	struct rtpengine_hash_entry *next;	// call next
+};
+
+/* table */
+struct rtpengine_hash_table {
+	struct rtpengine_hash_entry **row_entry_list;	// vector of size pointers to entry
+	gen_lock_t **row_locks;				// vector of size pointers to locks
+	unsigned int *row_totals;			// vector of size numbers of entries in the hashtable rows
+	unsigned int size;				// hash table size
+};
+
+
+int rtpengine_hash_table_init(int size);
+int rtpengine_hash_table_destroy();
+int rtpengine_hash_table_insert(str callid, str viabranch, struct rtpengine_hash_entry *value);
+int rtpengine_hash_table_remove(str callid, str viabranch);
+struct rtpp_node *rtpengine_hash_table_lookup(str callid, str viabranch);
+void rtpengine_hash_table_print();
+unsigned int rtpengine_hash_table_total();
+
+void rtpengine_hash_table_free_entry(struct rtpengine_hash_entry *entry);
+void rtpengine_hash_table_free_row_entry_list(struct rtpengine_hash_entry *row_entry_list);
+
+int rtpengine_hash_table_sanity_checks();
+
+#endif