Browse Source

dispatcher(k): basic framework for load dispatching

- a lightweight system to do a fair distrubution based on load
- no dependency on dialog, by using an internal table of active calls
  with minimal info, to keep the module suitable for small devices and
  have good performances
- not completed, requires xavp support for a compact info structure to
  be carried in transaction for each failover step
Daniel-Constantin Mierla 15 years ago
parent
commit
59c31fe72e

+ 92 - 2
modules_k/dispatcher/dispatch.c

@@ -70,6 +70,7 @@
 #include "../../lib/srdb1/db_res.h"
 #include "../../str.h"
 
+#include "ds_ht.h"
 #include "dispatch.h"
 
 #define DS_TABLE_VERSION	1
@@ -79,7 +80,7 @@
 
 static int _ds_table_version = DS_TABLE_VERSION;
 
-#define DS_DUID_SIZE	16
+static ds_ht_t *_dsht_load = NULL;
 
 typedef struct _ds_attrs
 {
@@ -93,6 +94,7 @@ typedef struct _ds_dest
 	str uri;
 	int flags;
 	int priority;
+	int dload;
 	ds_attrs_t attrs;
 	struct ip_addr ip_address; 	/*!< IP-Address of the entry */
 	unsigned short int port; 	/*!< Port of the request URI */
@@ -1134,6 +1136,49 @@ int ds_hash_pvar(struct sip_msg *msg, unsigned int *hash)
 	return 0;
 }
 
+int ds_get_leastloaded(ds_set_t *dset)
+{
+	int j;
+	int k;
+	int t;
+
+	k = 0;
+	t = dset->dlist[k].dload;
+	for(j=1; j<dset->nr; j++)
+	{
+		if(!((dset->dlist[j].flags & DS_INACTIVE_DST)
+				|| (dset->dlist[j].flags & DS_PROBING_DST)))
+		{
+			if(dset->dlist[j].dload<t)
+			{
+				k = j;
+				t = dset->dlist[k].dload;
+			}
+		}
+	}
+	return k;
+}
+
+int ds_update_load(struct sip_msg *msg, ds_set_t *dset, int setid, int dst)
+{
+	if(dset->dlist[dst].attrs.duid[0]=='\0')
+	{
+		LM_ERR("dst unique id not set for %d (%.*s)\n", setid,
+				msg->callid->body.len, msg->callid->body.s);
+		return -1;
+	}
+
+	if(ds_add_cell(_dsht_load, &msg->callid->body,
+			dset->dlist[dst].attrs.duid, setid)<0)
+	{
+		LM_ERR("cannot add load to %d (%.*s)\n", setid,
+				msg->callid->body.len, msg->callid->body.s);
+		return -1;
+	}
+	dset->dlist[dst].dload++;
+	return 0;
+}
+
 static inline int ds_get_index(int group, ds_set_t **index)
 {
 	ds_set_t *si = NULL;
@@ -1309,10 +1354,14 @@ int ds_select_dst(struct sip_msg *msg, int set, int alg, int mode)
 		case 8: /* use always first entry */
 			hash = 0;
 		break;
-		case 9: /* weight based load */
+		case 9: /* weight based distribution */
 			hash = idx->wlist[idx->wlast];
 			idx->wlast = (idx->wlast+1) % 100;
 		break;
+		case 10: /* load based distribution */
+			hash = ds_get_leastloaded(idx);
+			ds_update_load(msg, idx, set, hash);
+		break;
 		default:
 			LM_WARN("algo %d not implemented - using first entry...\n", alg);
 			hash = 0;
@@ -1804,3 +1853,44 @@ void ds_check_timer(unsigned int ticks, void* param)
 		}
 	}
 }
+
+void ds_ht_timer(unsigned int ticks, void *param)
+{
+	ds_cell_t *it;
+	ds_cell_t *it0;
+	time_t now;
+	int i;
+
+	if(_dsht_load==NULL)
+		return;
+
+	now = time(NULL);
+	
+	for(i=0; i<_dsht_load->htsize; i++)
+	{
+		/* free entries */
+		lock_get(&_dsht_load->entries[i].lock);
+		it = _dsht_load->entries[i].first;
+		while(it)
+		{
+			it0 = it->next;
+			if(it->expire!=0 && it->expire<now)
+			{
+				/* expired */
+				if(it->prev==NULL)
+					_dsht_load->entries[i].first = it->next;
+				else
+					it->prev->next = it->next;
+				if(it->next)
+					it->next->prev = it->prev;
+				_dsht_load->entries[i].esize--;
+				/* execute ds unload callback */
+				ds_cell_free(it);
+			}
+			it = it0;
+		}
+		lock_release(&_dsht_load->entries[i].lock);
+	}
+	return;
+}
+

+ 6 - 0
modules_k/dispatcher/dispatch.h

@@ -106,5 +106,11 @@ int ds_is_from_list(struct sip_msg *_m, int group);
  */
 void ds_check_timer(unsigned int ticks, void* param);
 
+
+/*! \brief
+ * Timer for checking active calls load
+ */
+void ds_ht_timer(unsigned int ticks, void *param);
+
 #endif
 

+ 10 - 0
modules_k/dispatcher/dispatcher.c

@@ -63,6 +63,7 @@
 #include "../../mem/mem.h"
 #include "../../mod_fix.h"
 
+#include "ds_ht.h"
 #include "dispatch.h"
 
 MODULE_VERSION
@@ -100,6 +101,8 @@ str ds_ping_from   = {"sip:dispatcher@localhost", 24};
 static int ds_ping_interval = 0;
 int ds_probing_mode  = 0;
 int ds_append_branch = 1;
+int ds_hash_size = 0;
+int ds_hash_expire = 7200;
 
 /* tm */
 struct tm_binds tmb;
@@ -175,6 +178,7 @@ static param_export_t params[]={
 	{"ds_ping_interval",   INT_PARAM, &ds_ping_interval},
 	{"ds_probing_mode",    INT_PARAM, &ds_probing_mode},
 	{"ds_append_branch",   INT_PARAM, &ds_append_branch},
+	{"ds_hash_sixe",       INT_PARAM, &ds_hash_size},
 	{0,0,0}
 };
 
@@ -232,6 +236,12 @@ static int mod_init(void)
 	if(init_data()!= 0)
 		return -1;
 
+	if(ds_hash_size>0)
+	{
+		if(ds_ht_init(1<<ds_hash_size, ds_hash_expire)<0)
+			return -1;
+		register_timer(ds_ht_timer, NULL, 10);
+	}
 	if(ds_db_url.s)
 	{
 		ds_db_url.len     = strlen(ds_db_url.s);

+ 278 - 0
modules_k/dispatcher/ds_ht.c

@@ -0,0 +1,278 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2010 Daniel-Constantin Mierla (asipto.com)
+ *
+ * This file is part of kamailio, a free SIP server.
+ *
+ * openser is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * openser is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#include <regex.h>
+
+#include "../../mem/shm_mem.h"
+#include "../../mem/mem.h"
+#include "../../dprint.h"
+#include "../../lib/kcore/hash_func.h"
+#include "../../ut.h"
+
+#include "ds_ht.h"
+
+
+#define ds_compute_hash(_s)        core_case_hash(_s,0,0)
+#define ds_get_entry(_h,_size)    (_h)&((_size)-1)
+
+
+ds_cell_t* ds_cell_new(str *cid, char *did, int dset, unsigned int cellid)
+{
+	ds_cell_t *cell;
+	unsigned int msize;
+
+	msize = sizeof(ds_cell_t) + (cid->len + 1)*sizeof(char);
+
+	cell = (ds_cell_t*)shm_malloc(msize);
+	if(cell==NULL)
+	{
+		LM_ERR("no more shm\n");
+		return NULL;
+	}
+
+	memset(cell, 0, msize);
+	cell->cellid = cellid;
+	cell->dset = dset;
+	cell->callid.len = cid->len;
+	cell->callid.s = (char*)cell + sizeof(ds_cell_t);
+	memcpy(cell->callid.s, cid->s, cid->len);
+	cell->callid.s[cid->len] = '\0';
+	strcpy(cell->duid, did);
+	return cell;
+}
+
+int ds_cell_free(ds_cell_t *cell)
+{
+	if(cell==NULL)
+		return -1;
+	shm_free(cell);
+	return 0;
+}
+
+
+
+ds_ht_t *ds_ht_init(unsigned int htsize, int expire)
+{
+	int i;
+	ds_ht_t *dsht = NULL;
+
+	dsht = (ds_ht_t*)shm_malloc(sizeof(ds_ht_t));
+	if(dsht==NULL)
+	{
+		LM_ERR("no more shm\n");
+		return NULL;
+	}
+	memset(dsht, 0, sizeof(ds_ht_t));
+	dsht->htsize = htsize;
+	dsht->htexpire = expire;
+
+	dsht->entries = (ds_entry_t*)shm_malloc(dsht->htsize*sizeof(ds_entry_t));
+	if(dsht->entries==NULL)
+	{
+		LM_ERR("no more shm.\n");
+		shm_free(dsht);
+		dsht = NULL;
+		return NULL;
+	}
+	memset(dsht->entries, 0, dsht->htsize*sizeof(ds_entry_t));
+
+	for(i=0; i<dsht->htsize; i++)
+	{
+		if(lock_init(&dsht->entries[i].lock)==0)
+		{
+			LM_ERR("cannot initalize lock[%d]\n", i);
+			i--;
+			while(i>=0)
+			{
+				lock_destroy(&dsht->entries[i].lock);
+				i--;
+			}
+			shm_free(dsht->entries);
+			shm_free(dsht);
+			dsht = NULL;
+			return NULL;
+		}
+	}
+
+	return dsht;
+}
+
+int ds_ht_destroy(ds_ht_t *dsht)
+{
+	int i;
+	ds_cell_t *it, *it0;
+
+	if(dsht==NULL)
+		return -1;
+
+	for(i=0; i<dsht->htsize; i++)
+	{
+		/* free entries */
+		it = dsht->entries[i].first;
+		while(it)
+		{
+			it0 = it;
+			it = it->next;
+			ds_cell_free(it0);
+		}
+		/* free locks */
+		lock_destroy(&dsht->entries[i].lock);
+	}
+	shm_free(dsht->entries);
+	shm_free(dsht);
+	dsht = NULL;
+	return 0;
+}
+
+
+int ds_add_cell(ds_ht_t *dsht, str *cid, char *duid, int dset)
+{
+	unsigned int idx;
+	unsigned int hid;
+	ds_cell_t *it, *prev, *cell;
+	time_t now;
+
+	if(dsht==NULL || dsht->entries==NULL)
+		return -1;
+
+	hid = ds_compute_hash(cid);
+	
+	idx = ds_get_entry(hid, dsht->htsize);
+
+	now = time(NULL);
+	prev = NULL;
+	lock_get(&dsht->entries[idx].lock);
+	it = dsht->entries[idx].first;
+	while(it!=NULL && it->cellid < hid)
+	{
+		prev = it;
+		it = it->next;
+	}
+	while(it!=NULL && it->cellid == hid)
+	{
+		if(cid->len==it->callid.len 
+				&& strncmp(cid->s, it->callid.s, cid->len)==0)
+		{
+			lock_release(&dsht->entries[idx].lock);
+			return -2;
+		}
+		prev = it;
+		it = it->next;
+	}
+	/* add */
+	cell = ds_cell_new(cid, duid, dset, hid);
+	if(cell == NULL)
+	{
+		LM_ERR("cannot create new cell.\n");
+		lock_release(&dsht->entries[idx].lock);
+		return -1;
+	}
+	cell->expire = now + dsht->htexpire;
+	if(prev==NULL)
+	{
+		if(dsht->entries[idx].first!=NULL)
+		{
+			cell->next = dsht->entries[idx].first;
+			dsht->entries[idx].first->prev = cell;
+		}
+		dsht->entries[idx].first = cell;
+	} else {
+		cell->next = prev->next;
+		cell->prev = prev;
+		if(prev->next)
+			prev->next->prev = cell;
+		prev->next = cell;
+	}
+	dsht->entries[idx].esize++;
+	lock_release(&dsht->entries[idx].lock);
+	return 0;
+}
+
+int ds_del_cell(ds_ht_t *dsht, str *cid)
+{
+	unsigned int idx;
+	unsigned int hid;
+	ds_cell_t *it;
+
+	if(dsht==NULL || dsht->entries==NULL)
+		return -1;
+
+	hid = ds_compute_hash(cid);
+	
+	idx = ds_get_entry(hid, dsht->htsize);
+
+	/* head test and return */
+	if(dsht->entries[idx].first==NULL)
+		return 0;
+	
+	lock_get(&dsht->entries[idx].lock);
+	it = dsht->entries[idx].first;
+	while(it!=NULL && it->cellid < hid)
+		it = it->next;
+	while(it!=NULL && it->cellid == hid)
+	{
+		if(cid->len==it->callid.len 
+				&& strncmp(cid->s, it->callid.s, cid->len)==0)
+		{
+			/* found */
+			if(it->prev==NULL)
+				dsht->entries[idx].first = it->next;
+			else
+				it->prev->next = it->next;
+			if(it->next)
+				it->next->prev = it->prev;
+			dsht->entries[idx].esize--;
+			lock_release(&dsht->entries[idx].lock);
+			ds_cell_free(it);
+			return 0;
+		}
+		it = it->next;
+	}
+	lock_release(&dsht->entries[idx].lock);
+	return 0;
+}
+
+int ds_ht_dbg(ds_ht_t *dsht)
+{
+	int i;
+	ds_cell_t *it;
+
+	for(i=0; i<dsht->htsize; i++)
+	{
+		lock_get(&dsht->entries[i].lock);
+		LM_ERR("htable[%d] -- <%d>\n", i, dsht->entries[i].esize);
+		it = dsht->entries[i].first;
+		while(it)
+		{
+			LM_ERR("\tcell: %.*s\n", it->callid.len, it->callid.s);
+			LM_ERR("\tduid: %s\n", it->duid);
+			LM_ERR("\thid: %u expire: %u\n", it->cellid,
+					(unsigned int)it->expire);
+			LM_ERR("\tdset:%d\n", it->dset);
+			it = it->next;
+		}
+		lock_release(&dsht->entries[i].lock);
+	}
+	return 0;
+}
+
+

+ 67 - 0
modules_k/dispatcher/ds_ht.h

@@ -0,0 +1,67 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2010 Daniel-Constantin Mierla (asipto.com)
+ *
+ * This file is part of kamailio, a free SIP server.
+ *
+ * openser is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * openser is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+		       
+#ifndef _DS_HT_H_
+#define _DS_HT_H_
+
+#include <time.h>
+
+#include "../../str.h"
+#include "../../locking.h"
+
+#define DS_DUID_SIZE	16
+
+typedef struct _ds_cell
+{
+    unsigned int cellid;
+	str callid;
+	char duid[DS_DUID_SIZE];
+	int dset;
+	time_t  expire;
+    struct _ds_cell *prev;
+    struct _ds_cell *next;
+} ds_cell_t;
+
+typedef struct _ds_entry
+{
+	unsigned int esize;
+	ds_cell_t *first;
+	gen_lock_t lock;	
+} ds_entry_t;
+
+typedef struct _ds_ht
+{
+	unsigned int htexpire;
+	unsigned int htsize;
+	ds_entry_t *entries;
+	struct _ds_ht *next;
+} ds_ht_t;
+
+ds_ht_t *ds_ht_init(unsigned int htsize, int expire);
+int ds_ht_destroy(ds_ht_t *dsht);
+int ds_add_cell(ds_ht_t *dsht, str *cid, char *did, int dset);
+int ds_del_cell(ds_ht_t *dsht, str *cid);
+
+int ds_ht_dbg(ds_ht_t *dsht);
+int ds_cell_free(ds_cell_t *cell);
+
+#endif