瀏覽代碼

rtpengine: hash table to keep the selected nodes

Shared memory hash table with global hashtable lock.
Add state maintaining the selected rtp node, for a given callid.
Hashtable entry expiration time configurable using hash_entry_tout modparam.
The actual deletion happens on the fly while insert/remove/lookup are called.
Updated doku.
Stefan Mititelu 10 年之前
父節點
當前提交
2625ab3ccd

+ 26 - 0
modules/rtpengine/doc/rtpengine_admin.xml

@@ -356,6 +356,32 @@ modparam("rtpproxy", "rtp_inst_pvar", "$avp(RTP_INSTANCE)")
 		</example>
 		</example>
 	</section>
 	</section>
 
 
+	<section id="rtpengine.p.hash_entry_tout">
+		<title><varname>hash_entry_tout</varname> (string)</title>
+		<para>
+			Number of seconds after an rtpengine hash table entry is marked for deletion.
+			By default, this parameter is set to 120 (seconds).
+		</para>
+		<para>
+			To maintain information about a selected rtp machine node, for a given call, entries are added in a hashtable of (callid, node) pairs.
+			When "offer" comes, insert new entry in the hastable.
+			When subsequent commands come, lookup callid and return chosen node.
+			When "delete" comes, remove old entry from hashtable.
+		</para>
+		<para>
+			NOTE: In the current implementation, the actual deletion happens <emphasis>on the fly</emphasis>,
+			while insert/remove/lookup the hastable, <emphasis>only</emphasis> for the entries in the insert/remove/lookup path.
+		</para>
+		<example>
+		<title>Set <varname>hash_entry_tout</varname> parameter</title>
+<programlisting format="linespecific">
+...
+modparam("rtpproxy", "hash_entry_tout", "300")
+...
+</programlisting>
+		</example>
+	</section>
+
 	</section>
 	</section>
 
 
 	<section>
 	<section>

+ 153 - 36
modules/rtpengine/rtpengine.c

@@ -78,6 +78,7 @@
 #include "../../modules/tm/tm_load.h"
 #include "../../modules/tm/tm_load.h"
 #include "rtpengine.h"
 #include "rtpengine.h"
 #include "rtpengine_funcs.h"
 #include "rtpengine_funcs.h"
+#include "rtpengine_hash.h"
 #include "bencode.h"
 #include "bencode.h"
 
 
 MODULE_VERSION
 MODULE_VERSION
@@ -187,7 +188,9 @@ static int rtpengine_offer_answer(struct sip_msg *msg, const char *flags, int op
 static int fixup_set_id(void ** param, int param_no);
 static int fixup_set_id(void ** param, int param_no);
 static int set_rtpengine_set_f(struct sip_msg * msg, char * str1, char * str2);
 static int set_rtpengine_set_f(struct sip_msg * msg, char * str1, char * str2);
 static struct rtpp_set * select_rtpp_set(int id_set);
 static struct rtpp_set * select_rtpp_set(int id_set);
-static struct rtpp_node *select_rtpp_node(str, int);
+static struct rtpp_node *select_rtpp_node_new(str, int, int);
+static struct rtpp_node *select_rtpp_node_old(str, int, int);
+static struct rtpp_node *select_rtpp_node(str, int, int);
 static char *send_rtpp_command(struct rtpp_node *, bencode_item_t *, int *);
 static char *send_rtpp_command(struct rtpp_node *, bencode_item_t *, int *);
 static int get_extra_id(struct sip_msg* msg, str *id_str);
 static int get_extra_id(struct sip_msg* msg, str *id_str);
 
 
@@ -228,6 +231,7 @@ static pid_t mypid;
 static unsigned int myseqn = 0;
 static unsigned int myseqn = 0;
 static str extra_id_pv_param = {NULL, 0};
 static str extra_id_pv_param = {NULL, 0};
 static char *setid_avp_param = NULL;
 static char *setid_avp_param = NULL;
+static int hash_entry_tout = 120;
 
 
 static char ** rtpp_strings=0;
 static char ** rtpp_strings=0;
 static int rtpp_sets=0; /*used in rtpengine_set_store()*/
 static int rtpp_sets=0; /*used in rtpengine_set_store()*/
@@ -336,6 +340,7 @@ static param_export_t params[] = {
 	{"rtp_inst_pvar",         PARAM_STR, &rtp_inst_pv_param },
 	{"rtp_inst_pvar",         PARAM_STR, &rtp_inst_pv_param },
 	{"write_sdp_pv",          PARAM_STR, &write_sdp_pvar_str          },
 	{"write_sdp_pv",          PARAM_STR, &write_sdp_pvar_str          },
 	{"read_sdp_pv",           PARAM_STR, &read_sdp_pvar_str          },
 	{"read_sdp_pv",           PARAM_STR, &read_sdp_pvar_str          },
+	{"hash_entry_tout",       INT_PARAM, &hash_entry_tout        },
 	{0, 0, 0}
 	{0, 0, 0}
 };
 };
 
 
@@ -1440,6 +1445,14 @@ mod_init(void)
 		return -1;
 		return -1;
 	}
 	}
 
 
+	/* init the hastable which keeps the call-id <-> selected_node relation */
+	if (!rtpengine_hash_table_init()) {
+		LM_ERR("rtpengine_hash_table_init() failed!\n");
+		return -1;
+	} else {
+		LM_DBG("rtpengine_hash_table_init() success!\n");
+	}
+
 	return 0;
 	return 0;
 }
 }
 
 
@@ -1579,6 +1592,13 @@ static void mod_destroy(void)
 	}
 	}
 
 
 	shm_free(rtpp_set_list);
 	shm_free(rtpp_set_list);
+
+	/* destroy the hastable which keeps the call-id <-> selected_node relation */
+	if (!rtpengine_hash_table_destroy()) {
+		LM_ERR("rtpengine_hash_table_destroy() failed!\n");
+	} else {
+		LM_DBG("rtpengine_hash_table_destroy() success!\n");
+	}
 }
 }
 
 
 
 
@@ -1923,16 +1943,17 @@ select_node:
 			LM_ERR("queried nodes limit reached\n");
 			LM_ERR("queried nodes limit reached\n");
 			goto error;
 			goto error;
 		}
 		}
-		node = select_rtpp_node(callid, 1);
+		node = select_rtpp_node(callid, 1, op);
 		if (!node) {
 		if (!node) {
 			LM_ERR("no available proxies\n");
 			LM_ERR("no available proxies\n");
 			goto error;
 			goto error;
 		}
 		}
+
 		cp = send_rtpp_command(node, ng_flags.dict, &ret);
 		cp = send_rtpp_command(node, ng_flags.dict, &ret);
-        if (cp == NULL) {
-    	    node->rn_disabled = 1;
-        	node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout;
-        }
+		if (cp == NULL) {
+			node->rn_disabled = 1;
+			node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout;
+		}
 	} while (cp == NULL);
 	} while (cp == NULL);
 	LM_DBG("proxy reply: %.*s\n", ret, cp);
 	LM_DBG("proxy reply: %.*s\n", ret, cp);
 
 
@@ -1944,12 +1965,13 @@ select_node:
 		LM_ERR("failed to decode bencoded reply from proxy: %.*s\n", ret, cp);
 		LM_ERR("failed to decode bencoded reply from proxy: %.*s\n", ret, cp);
 		goto error;
 		goto error;
 	}
 	}
+
 	if (!bencode_dictionary_get_strcmp(resp, "result", "error")) {
 	if (!bencode_dictionary_get_strcmp(resp, "result", "error")) {
 		if (!bencode_dictionary_get_str(resp, "error-reason", &error)) {
 		if (!bencode_dictionary_get_str(resp, "error-reason", &error)) {
 		    LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp);
 		    LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp);
 		}
 		}
 		else {
 		else {
-			if ((RTPENGINE_SESS_LIMIT_MSG_LEN == error.len) && 
+			if ((RTPENGINE_SESS_LIMIT_MSG_LEN == error.len) &&
 			    (strncmp(error.s, RTPENGINE_SESS_LIMIT_MSG, RTPENGINE_SESS_LIMIT_MSG_LEN) == 0))
 			    (strncmp(error.s, RTPENGINE_SESS_LIMIT_MSG, RTPENGINE_SESS_LIMIT_MSG_LEN) == 0))
 			{
 			{
 				LM_WARN("proxy %.*s: %.*s", node->rn_url.len, node->rn_url.s , error.len, error.s);
 				LM_WARN("proxy %.*s: %.*s", node->rn_url.len, node->rn_url.s , error.len, error.s);
@@ -1957,12 +1979,23 @@ select_node:
 			}
 			}
 			LM_ERR("proxy replied with error: %.*s\n", error.len, error.s);
 			LM_ERR("proxy replied with error: %.*s\n", error.len, error.s);
 		}
 		}
-        goto error;
+		goto error;
 	}
 	}
 
 
 	if (body_out)
 	if (body_out)
 		*body_out = body;
 		*body_out = body;
 
 
+	if (op == OP_DELETE) {
+		/* Delete the key<->value from the hashtable */
+		if (!rtpengine_hash_table_remove(&callid)) {
+			LM_ERR("rtpengine hash table failed to remove entry for callen=%d callid=%.*s\n",
+				callid.len, callid.len, callid.s);
+		} else {
+			LM_DBG("rtpengine hash table remove entry for callen=%d callid=%.*s\n",
+				callid.len, callid.len, callid.s);
+		}
+	}
+
 	return resp;
 	return resp;
 
 
 error:
 error:
@@ -2191,81 +2224,165 @@ static struct rtpp_set * select_rtpp_set(int id_set ){
 
 
 	return rtpp_list;
 	return rtpp_list;
 }
 }
+
 /*
 /*
- * Main balancing routine. This does not try to keep the same proxy for
- * the call if some proxies were disabled or enabled; proxy death considered
- * too rare. Otherwise we should implement "mature" HA clustering, which is
- * too expensive here.
+ * run the selection algorithm and return the new selected node
  */
  */
 static struct rtpp_node *
 static struct rtpp_node *
-select_rtpp_node(str callid, int do_test)
+select_rtpp_node_new(str callid, int do_test, int op)
 {
 {
-	unsigned sum, sumcut, weight_sum;
 	struct rtpp_node* node;
 	struct rtpp_node* node;
-	int was_forced;
-
-	if(!active_rtpp_set){
-		LM_ERR("script error -no valid set selected\n");
-		return NULL;
-	}
-	/* Most popular case: 1 proxy, nothing to calculate */
-	if (active_rtpp_set->rtpp_node_count == 1) {
-		node = active_rtpp_set->rn_first;
-		if (node->rn_disabled && node->rn_recheck_ticks <= get_ticks())
-			node->rn_disabled = rtpp_test(node, 1, 0);
-		return node->rn_disabled ? NULL : node;
-	}
+	unsigned i, sum, sumcut, weight_sum;
+	int was_forced = 0;
 
 
 	/* XXX Use quick-and-dirty hashing algo */
 	/* XXX Use quick-and-dirty hashing algo */
-	for(sum = 0; callid.len > 0; callid.len--)
-		sum += callid.s[callid.len - 1];
+	for(i = 0; i < callid.len; i++)
+		sum += callid.s[i];
 	sum &= 0xff;
 	sum &= 0xff;
 
 
-	was_forced = 0;
 retry:
 retry:
 	weight_sum = 0;
 	weight_sum = 0;
-	for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
 
 
+	for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
+		/* Try to enable if it's time to try. */
 		if (node->rn_disabled && node->rn_recheck_ticks <= get_ticks()){
 		if (node->rn_disabled && node->rn_recheck_ticks <= get_ticks()){
-			/* Try to enable if it's time to try. */
 			node->rn_disabled = rtpp_test(node, 1, 0);
 			node->rn_disabled = rtpp_test(node, 1, 0);
 		}
 		}
-		if (!node->rn_disabled)
+
+		/* Select only between enabled machines */
+		if (!node->rn_disabled) {
 			weight_sum += node->rn_weight;
 			weight_sum += node->rn_weight;
+		}
 	}
 	}
+
+	/* No proxies? Force all to be redetected, if not yet */
 	if (weight_sum == 0) {
 	if (weight_sum == 0) {
-		/* No proxies? Force all to be redetected, if not yet */
-		if (was_forced)
+		if (was_forced) {
 			return NULL;
 			return NULL;
+		}
+
 		was_forced = 1;
 		was_forced = 1;
+
 		for(node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
 		for(node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
 			node->rn_disabled = rtpp_test(node, 1, 1);
 			node->rn_disabled = rtpp_test(node, 1, 1);
 		}
 		}
+
 		goto retry;
 		goto retry;
 	}
 	}
+
+	/* sumcut here lays from 0 to weight_sum-1 */
 	sumcut = sum % weight_sum;
 	sumcut = sum % weight_sum;
+
 	/*
 	/*
-	 * sumcut here lays from 0 to weight_sum-1.
 	 * Scan proxy list and decrease until appropriate proxy is found.
 	 * Scan proxy list and decrease until appropriate proxy is found.
 	 */
 	 */
 	for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
 	for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
+		/* Select only between enabled machines */
 		if (node->rn_disabled)
 		if (node->rn_disabled)
 			continue;
 			continue;
+
+		/* Found enabled machine */
 		if (sumcut < node->rn_weight)
 		if (sumcut < node->rn_weight)
 			goto found;
 			goto found;
+
+		/* Update sumcut if enabled machine */
 		sumcut -= node->rn_weight;
 		sumcut -= node->rn_weight;
 	}
 	}
+
 	/* No node list */
 	/* No node list */
 	return NULL;
 	return NULL;
+
 found:
 found:
 	if (do_test) {
 	if (do_test) {
 		node->rn_disabled = rtpp_test(node, node->rn_disabled, 0);
 		node->rn_disabled = rtpp_test(node, node->rn_disabled, 0);
 		if (node->rn_disabled)
 		if (node->rn_disabled)
 			goto retry;
 			goto retry;
 	}
 	}
+
+	/* build hash table entry */
+	struct rtpengine_hash_entry *entry = shm_malloc(sizeof(struct rtpp_node));
+	if (shm_str_dup(&entry->callid, &callid) < 0) {
+		LM_ERR("rtpengine hash table fail to duplicate calllen=%d callid=%.*s",
+			callid.len, callid.len, callid.s);
+	}
+	entry->node = node;
+	entry->next = NULL;
+	entry->tout = get_ticks() + hash_entry_tout;
+
+	/* Insert the key<->entry from the hashtable */
+	if (!rtpengine_hash_table_insert(&callid, entry)) {
+		LM_ERR("rtpengine hash table fail to insert node=%.*s for calllen=%d callid=%.*s",
+			node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s);
+	} else {
+		LM_DBG("rtpengine hash table insert node=%.*s for calllen=%d callid=%.*s\n",
+			node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s);
+	}
+
+	/* Return selected node  */
 	return node;
 	return node;
 }
 }
 
 
+/*
+ * lookup the hastable (key=callid value=node) and get the old node
+ */
+static struct rtpp_node *
+select_rtpp_node_old(str callid, int do_test, int op)
+{
+	struct rtpp_node *node = NULL;
+	struct rtpengine_hash_entry *entry = NULL;
+
+	entry = rtpengine_hash_table_lookup(&callid);
+	if (!entry) {
+		LM_ERR("rtpengine hash table lookup failed to find entry for calllen=%d callid=%.*s\n",
+			callid.len, callid.len, callid.s);
+	} else {
+		LM_DBG("rtpengine hash table lookup find entry for calllen=%d callid=%.*s\n",
+			callid.len, callid.len, callid.s);
+		node = entry->node;
+	}
+
+	if (!node) {
+		LM_ERR("rtpengine hash table lookup failed to find node for calllen=%d callid=%.*s\n",
+			callid.len, callid.len, callid.s);
+		return NULL;
+	} else {
+		LM_DBG("rtpengine hash table lookup find node=%.*s for calllen=%d callid=%.*s\n",
+			node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s);
+	}
+
+	// if node broke, don't send any message
+	if (!node->rn_disabled) {
+		return node;
+	} else {
+		LM_DBG("rtpengine hash table lookup find node=%.*s for calllen=%d callid=%.*s, which is disabled!\n",
+			node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s);
+	}
+
+	return NULL;
+}
+
+/*
+ * Main balancing routine. This DO try to keep the same proxy for
+ * the call if some proxies were disabled or enabled (e.g. kamctl command)
+ */
+static struct rtpp_node *
+select_rtpp_node(str callid, int do_test, int op)
+{
+	if(!active_rtpp_set) {
+		LM_ERR("script error - no valid set selected\n");
+		return NULL;
+	}
+
+	// calculate and choose a node
+	if (op == OP_OFFER) {
+		// run the selection algorithm
+		return select_rtpp_node_new(callid, do_test, op);
+	} else {
+		// lookup the hastable (key=callid value=node) and get the old node
+		return select_rtpp_node_old(callid, do_test, op);
+	}
+}
+
 static int
 static int
 get_extra_id(struct sip_msg* msg, str *id_str) {
 get_extra_id(struct sip_msg* msg, str *id_str) {
 	if(msg==NULL || extra_id_pv==NULL || id_str==NULL) {
 	if(msg==NULL || extra_id_pv==NULL || id_str==NULL) {

+ 314 - 0
modules/rtpengine/rtpengine_hash.c

@@ -0,0 +1,314 @@
+#include "rtpengine_hash.h"
+
+#include "../../str.h"
+#include "../../dprint.h"
+#include "../../mem/shm_mem.h"
+#include "../../locking.h"
+#include "../../timer.h"
+
+static gen_lock_t *rtpengine_hash_lock;
+static struct rtpengine_hash_table *rtpengine_hash_table;
+
+/* from sipwise rtpengine */
+static int str_cmp_str(const str *a, const str *b) {
+	if (a->len < b->len)
+		return -1;
+	if (a->len > b->len)
+		return 1;
+	if (a->len == 0 && b->len == 0)
+		return 0;
+	return memcmp(a->s, b->s, a->len);
+}
+
+/* from sipwise rtpengine */
+static int str_equal(void *a, void *b) {
+	return (str_cmp_str((str *) a, (str *) b) == 0);
+}
+
+/* from sipwise rtpengine */
+static unsigned int str_hash(void *ss) {
+	const str *s = (str*) ss;
+	unsigned int ret = 5381;
+	str it = *s;
+
+	while (it.len > 0) {
+		ret = (ret << 5) + ret + *it.s;
+		it.s++;
+		it.len--;
+	}
+
+	return ret % RTPENGINE_HASH_TABLE_SIZE;
+}
+
+/* rtpengine glib hash API */
+int rtpengine_hash_table_init() {
+	int i;
+
+	// init hashtable
+	rtpengine_hash_table = shm_malloc(sizeof(struct rtpengine_hash_table));
+	if (!rtpengine_hash_table) {
+		LM_ERR("no shm left to create rtpengine_hash_table\n");
+		return 0;
+	}
+
+	// init hashtable entry_list heads (never filled)
+	for (i = 0; i < RTPENGINE_HASH_TABLE_SIZE; i++) {
+		rtpengine_hash_table->entry_list[i] = shm_malloc(sizeof(struct rtpengine_hash_entry));
+		if (!rtpengine_hash_table->entry_list[i]) {
+			LM_ERR("no shm left to create rtpengine_hash_table->entry_list[%d]\n", i);
+			return 0;
+		}
+
+		/* never expire the head of the hashtable index lists */
+		rtpengine_hash_table->entry_list[i]->tout = -1;
+		rtpengine_hash_table->entry_list[i]->next = NULL;
+	}
+
+	// init lock
+	rtpengine_hash_lock = lock_alloc();
+	if (!rtpengine_hash_lock) {
+		LM_ERR("no shm left to init rtpengine_hash_table lock");
+		return 0;
+	}
+
+	return 1;
+}
+
+int rtpengine_hash_table_destroy() {
+	int i;
+	struct rtpengine_hash_entry *entry, *last_entry;
+
+	// check rtpengine hashtable
+	if (!rtpengine_hash_table) {
+		LM_ERR("NULL rtpengine_hash_table");
+		return 0;
+	}
+
+	// destroy hashtable entry_list content
+	lock_get(rtpengine_hash_lock);
+	for (i = 0; i < RTPENGINE_HASH_TABLE_SIZE; i++) {
+		entry = rtpengine_hash_table->entry_list[i];
+		while (entry) {
+			last_entry = entry;
+			entry = entry->next;
+			shm_free(last_entry->callid.s);
+			shm_free(last_entry);
+		}
+	}
+
+	// destroy hashtable
+	shm_free(rtpengine_hash_table);
+	rtpengine_hash_table = NULL;
+	lock_release(rtpengine_hash_lock);
+
+	// destroy lock
+	if (!rtpengine_hash_lock) {
+		LM_ERR("NULL rtpengine_hash_lock");
+	} else {
+		lock_dealloc(rtpengine_hash_lock);
+		rtpengine_hash_lock = NULL;
+	}
+
+	return 1;
+}
+
+int rtpengine_hash_table_insert(void *key, void *value) {
+	struct rtpengine_hash_entry *entry, *last_entry;
+	struct rtpengine_hash_entry *new_entry = (struct rtpengine_hash_entry *) value;
+	unsigned int hash_index;
+
+	// check rtpengine hashtable
+	if (!rtpengine_hash_table) {
+		LM_ERR("NULL rtpengine_hash_table");
+		return 0;
+	}
+
+	// get entry list
+	hash_index = str_hash(key);
+	entry = rtpengine_hash_table->entry_list[hash_index];
+	last_entry = entry;
+
+	// lock
+	lock_get(rtpengine_hash_lock);
+	while (entry) {
+		// if key found, don't add new entry
+		if (str_equal(&entry->callid, &new_entry->callid)) {
+			// unlock
+			lock_release(rtpengine_hash_lock);
+			LM_ERR("Call id %.*s already in hashtable, ignore new value", entry->callid.len, entry->callid.s);
+			return 0;
+		}
+
+		// if expired entry discovered, delete it
+		if (entry->tout < get_ticks()) {
+			// set pointers; exclude entry
+			last_entry->next = entry->next;
+
+			// free current entry; entry points to unknown
+			shm_free(entry->callid.s);
+			shm_free(entry);
+
+			// set pointers
+			entry = last_entry;
+		}
+
+		// next entry in the list
+		last_entry = entry;
+		entry = entry->next;
+	}
+
+	last_entry->next = new_entry;
+
+	// unlock
+	lock_release(rtpengine_hash_lock);
+
+	return 1;
+}
+
+int rtpengine_hash_table_remove(void *key) {
+	struct rtpengine_hash_entry *entry, *last_entry;
+	unsigned int hash_index;
+
+	// check rtpengine hashtable
+	if (!rtpengine_hash_table) {
+		LM_ERR("NULL rtpengine_hash_table");
+		return 0;
+	}
+
+	// get first entry from entry list; jump over unused list head
+	hash_index = str_hash(key);
+	entry = rtpengine_hash_table->entry_list[hash_index];
+	last_entry = entry;
+
+	// lock
+	lock_get(rtpengine_hash_lock);
+	while (entry) {
+		// if key found, delete entry
+		if (str_equal(&entry->callid, (str *)key)) {
+			// free entry
+			last_entry->next = entry->next;
+			shm_free(entry->callid.s);
+			shm_free(entry);
+
+			// unlock
+			lock_release(rtpengine_hash_lock);
+
+			return 1;
+		}
+
+		// if expired entry discovered, delete it
+		if (entry->tout < get_ticks()) {
+			// set pointers; exclude entry
+			last_entry->next = entry->next;
+
+			// free current entry; entry points to unknown
+			shm_free(entry->callid.s);
+			shm_free(entry);
+
+			// set pointers
+			entry = last_entry;
+		}
+
+		last_entry = entry;
+		entry = entry->next;
+	}
+
+	// unlock
+	lock_release(rtpengine_hash_lock);
+
+	return 0;
+}
+
+void* rtpengine_hash_table_lookup(void *key) {
+	struct rtpengine_hash_entry *entry, *last_entry;
+	unsigned int hash_index;
+
+	// check rtpengine hashtable
+	if (!rtpengine_hash_table) {
+		LM_ERR("NULL rtpengine_hash_table");
+		return 0;
+	}
+
+	// get first entry from entry list; jump over unused list head
+	hash_index = str_hash(key);
+	entry = rtpengine_hash_table->entry_list[hash_index];
+	last_entry = entry;
+
+	// lock
+	lock_get(rtpengine_hash_lock);
+	while (entry) {
+		// if key found, return entry
+		if (str_equal(&entry->callid, (str *)key)) {
+			// unlock
+			lock_release(rtpengine_hash_lock);
+
+			return entry;
+		}
+
+		// if expired entry discovered, delete it
+		if (entry->tout < get_ticks()) {
+			// set pointers; exclude entry
+			last_entry->next = entry->next;
+
+			// free current entry; entry points to unknown
+			shm_free(entry->callid.s);
+			shm_free(entry);
+
+			// set pointers
+			entry = last_entry;
+		}
+
+		last_entry = entry;
+		entry = entry->next;
+	}
+
+	// unlock
+	lock_release(rtpengine_hash_lock);
+
+	return NULL;
+}
+
+// print hash table entries while deleting expired entries
+void rtpengine_hash_table_print() {
+	int i;
+	struct rtpengine_hash_entry *entry, *last_entry;
+
+	// check rtpengine hashtable
+	if (!rtpengine_hash_table) {
+		LM_ERR("NULL rtpengine_hash_table");
+		return ;
+	}
+
+	// lock
+	lock_get(rtpengine_hash_lock);
+
+	// print hashtable
+	for (i = 0; i < RTPENGINE_HASH_TABLE_SIZE; i++) {
+		entry = rtpengine_hash_table->entry_list[i];
+		last_entry = entry;
+
+		while (entry) {
+			// if expired entry discovered, delete it
+			if (entry->tout < get_ticks()) {
+				// set pointers; exclude entry
+				last_entry->next = entry->next;
+
+				// free current entry; entry points to unknown
+				shm_free(entry->callid.s);
+				shm_free(entry);
+
+				// set pointers
+				entry = last_entry;
+			} else {
+				LM_DBG("hash_index=%d callid=%.*s tout=%u\n",
+					i, entry->callid.len, entry->callid.s, entry->tout - get_ticks());
+			}
+
+			last_entry = entry;
+			entry = entry->next;
+		}
+	}
+
+	// unlock
+	lock_release(rtpengine_hash_lock);
+}

+ 30 - 0
modules/rtpengine/rtpengine_hash.h

@@ -0,0 +1,30 @@
+#ifndef _RTPENGINE_HASH_H
+#define _RTPENGINE_HASH_H
+
+#include "../../str.h"
+
+#define RTPENGINE_HASH_TABLE_SIZE       512
+
+/* table entry */
+struct rtpengine_hash_entry {
+	unsigned int tout;			// call timeout
+	str callid;				// call callid
+	struct rtpp_node *node;			// call selected node
+
+	struct rtpengine_hash_entry *next;	// next 
+};
+
+/* table */
+struct rtpengine_hash_table {
+	struct rtpengine_hash_entry *entry_list[RTPENGINE_HASH_TABLE_SIZE];
+};
+
+
+int rtpengine_hash_table_init();
+int rtpengine_hash_table_destroy();
+int rtpengine_hash_table_insert(void *key, void *value);
+int rtpengine_hash_table_remove(void *key);
+void* rtpengine_hash_table_lookup(void *key);
+void rtpengine_hash_table_print() ;
+
+#endif