Selaa lähdekoodia

Merge branch 'mariusbucur/dmq'

Marius Bucur 14 vuotta sitten
vanhempi
commit
93d10d9e59

+ 23 - 0
modules_k/dmq/Makefile

@@ -0,0 +1,23 @@
+# $Id$
+#
+# distributed message queue system for inter-intstance communication
+#
+# 
+# WARNING: do not run this directly, it should be run by the master Makefile
+
+include ../../Makefile.defs
+auto_gen=
+NAME=dmq.so
+LIBS=
+
+DEFS+=-I/usr/include/libxml2 -I$(LOCALBASE)/include/libxml2 \
+      -I$(LOCALBASE)/include
+LIBS+=-L/usr/include/lib  -L$(LOCALBASE)/lib -lxml2
+
+DEFS+=-DOPENSER_MOD_INTERFACE
+
+SERLIBPATH=../../lib
+SER_LIBS+=$(SERLIBPATH)/kmi/kmi
+SER_LIBS+=$(SERLIBPATH)/srdb1/srdb1
+SER_LIBS+=$(SERLIBPATH)/kcore/kcore
+include ../../Makefile.modules

+ 11 - 0
modules_k/dmq/bind_dmq.c

@@ -0,0 +1,11 @@
+#include "dmq.h"
+#include "bind_dmq.h"
+#include "peer.h"
+#include "dmq_funcs.h"
+
+int bind_dmq(dmq_api_t* api) {
+	api->register_dmq_peer = register_dmq_peer;
+	api->send_message = send_dmq_message;
+	api->bcast_message = bcast_dmq_message;
+	return 0;
+}

+ 21 - 0
modules_k/dmq/bind_dmq.h

@@ -0,0 +1,21 @@
+#ifndef BIND_DMQ_H
+#define BIND_DMQ_H
+
+#include "peer.h"
+#include "dmqnode.h"
+#include "dmq_funcs.h"
+
+typedef int (*bcast_message_t)(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_cback_t* resp_cback, int max_forwards);
+typedef int (*send_message_t)(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback, int max_forwards);
+
+typedef struct dmq_api {
+	register_dmq_peer_t register_dmq_peer;
+	bcast_message_t bcast_message;
+	send_message_t send_message;
+} dmq_api_t;
+
+typedef int (*bind_dmq_f)(dmq_api_t* api);
+
+int bind_dmq(dmq_api_t* api);
+
+#endif

+ 283 - 0
modules_k/dmq/dmq.c

@@ -0,0 +1,283 @@
+/*
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ * History:
+ * --------
+ *  2010-03-29  initial version (mariusbucur)
+ */
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/ipc.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <time.h>
+
+#include "../../sr_module.h"
+#include "../../dprint.h"
+#include "../../error.h"
+#include "../../ut.h"
+#include "../../mem/mem.h"
+#include "../../mem/shm_mem.h"
+#include "../../usr_avp.h"
+#include "../../modules/tm/tm_load.h"
+#include "../../parser/parse_uri.h"
+#include "../../modules/sl/sl.h"
+#include "../../pt.h"
+#include "../../lib/kmi/mi.h"
+#include "../../lib/kcore/hash_func.h"
+#include "dmq.h"
+#include "dmq_funcs.h"
+#include "peer.h"
+#include "bind_dmq.h"
+#include "worker.h"
+#include "notification_peer.h"
+#include "dmqnode.h"
+#include "../../mod_fix.h"
+
+static int mod_init(void);
+static int child_init(int);
+static void destroy(void);
+
+MODULE_VERSION
+
+int startup_time = 0;
+int pid = 0;
+
+/* module parameters */
+int num_workers = DEFAULT_NUM_WORKERS;
+str dmq_server_address = {0, 0};
+struct sip_uri dmq_server_uri;
+
+str dmq_notification_address = {0, 0};
+struct sip_uri dmq_notification_uri;
+int ping_interval = 4;
+
+/* TM bind */
+struct tm_binds tmb;
+/* SL API structure */
+sl_api_t slb;
+
+/** module variables */
+str dmq_request_method = {"KDMQ", 4};
+dmq_worker_t* workers;
+dmq_peer_list_t* peer_list;
+/* the list of dmq servers */
+dmq_node_list_t* node_list;
+// the dmq module is a peer itself for receiving notifications regarding nodes
+dmq_peer_t* dmq_notification_peer;
+
+/** module functions */
+static int mod_init(void);
+static int child_init(int);
+static void destroy(void);
+static int handle_dmq_fixup(void** param, int param_no);
+static int parse_server_address(str* uri, struct sip_uri* parsed_uri);
+
+static cmd_export_t cmds[] = {
+	{"handle_dmq_message",  (cmd_function)handle_dmq_message, 0, handle_dmq_fixup, 0, 
+		REQUEST_ROUTE},
+	{"bind_dmq",        (cmd_function)bind_dmq, 0, 0, 0,
+		REQUEST_ROUTE},
+	{0, 0, 0, 0, 0, 0}
+};
+
+static param_export_t params[] = {
+	{"num_workers", INT_PARAM, &num_workers},
+	{"ping_interval", INT_PARAM, &ping_interval},
+	{"server_address", STR_PARAM, &dmq_server_address.s},
+	{"notification_address", STR_PARAM, &dmq_notification_address.s},
+	{0, 0, 0}
+};
+
+static mi_export_t mi_cmds[] = {
+	{0, 0, 0, 0, 0}
+};
+
+/** module exports */
+struct module_exports exports = {
+	"dmq",				/* module name */
+	DEFAULT_DLFLAGS,		/* dlopen flags */
+	cmds,				/* exported functions */
+	params,				/* exported parameters */
+	0,				/* exported statistics */
+	mi_cmds,   			/* exported MI functions */
+	0,				/* exported pseudo-variables */
+	0,				/* extra processes */
+	mod_init,			/* module initialization function */
+	0,   				/* response handling function */
+	(destroy_function) destroy, 	/* destroy function */
+	child_init                  	/* per-child init function */
+};
+
+/**
+ * init module function
+ */
+static int mod_init(void) {
+	
+	if(register_mi_mod(exports.name, mi_cmds)!=0) {
+		LM_ERR("failed to register MI commands\n");
+		return -1;
+	}
+
+	/* bind the SL API */
+	if (sl_load_api(&slb)!=0) {
+		LM_ERR("cannot bind to SL API\n");
+		return -1;
+	}
+	
+	/* load all TM stuff */
+	if(load_tm_api(&tmb)==-1) {
+		LM_ERR("can't load tm functions. TM module probably not loaded\n");
+		return -1;
+	}
+	/* load peer list - the list containing the module callbacks for dmq */
+	
+	peer_list = init_peer_list();
+	
+	/* load the dmq node list - the list containing the dmq servers */
+	node_list = init_dmq_node_list();
+	
+	/* register worker processes - add one because of the ping process */
+	register_procs(num_workers);
+	
+	/* check server_address and notification_address are not empty and correct */
+	if(parse_server_address(&dmq_server_address, &dmq_server_uri) < 0) {
+		LM_ERR("server address invalid\n");
+		return -1;
+	}
+	
+	if(parse_server_address(&dmq_notification_address, &dmq_notification_uri) < 0) {
+		LM_ERR("notification address invalid\n");
+		return -1;
+	}
+	
+	/* allocate workers array */
+	workers = shm_malloc(num_workers * sizeof(*workers));
+	if(workers == NULL) {
+		LM_ERR("error in shm_malloc\n");
+		return -1;
+	}
+	
+	/**
+         * add the dmq notification peer.
+	 * the dmq is a peer itself so that it can receive node notifications
+	 */
+	add_notification_peer();
+	
+	startup_time = (int) time(NULL);
+	
+	/**
+	 * add the ping timer
+	 * it pings the servers once in a while so that we know which failed
+	 */
+	if(ping_interval < MIN_PING_INTERVAL) {
+		ping_interval = MIN_PING_INTERVAL;
+	}
+	register_timer(ping_servers, 0, ping_interval);
+	
+	return 0;
+}
+
+/**
+ * initialize children
+ */
+static int child_init(int rank) {
+  	int i, newpid;
+	if (rank == PROC_MAIN) {
+		/* fork worker processes */
+		for(i = 0; i < num_workers; i++) {
+			init_worker(&workers[i]);
+			LM_DBG("starting worker process %d\n", i);
+			newpid = fork_process(PROC_NOCHLDINIT, "DMQ WORKER", 0);
+			if(newpid < 0) {
+				LM_ERR("failed to form process\n");
+				return -1;
+			} else if(newpid == 0) {
+				// child - this will loop forever
+				worker_loop(i);
+			} else {
+				workers[i].pid = newpid;
+			}
+		}
+		/* notification_node - the node from which the Kamailio instance
+		 * gets the server list on startup.
+		 * the address is given as a module parameter in dmq_notification_address
+		 * the module MUST have this parameter if the Kamailio instance is not
+		 * a master in this architecture
+		 */
+		if(dmq_notification_address.s) {
+			notification_node = add_server_and_notify(&dmq_notification_address);
+			if(!notification_node) {
+				LM_ERR("cannot retrieve initial nodelist from %.*s\n",
+				       STR_FMT(&dmq_notification_address));
+				return -1;
+			}
+		}
+		return 0;
+	}
+	if(rank == PROC_INIT || rank == PROC_TCP_MAIN) {
+		/* do nothing for the main process */
+		return 0;
+	}
+
+	pid = my_pid();
+	return 0;
+}
+
+/*
+ * destroy function
+ */
+static void destroy(void) {
+	/* TODO unregister dmq node, free resources */
+	if(dmq_notification_address.s) {
+		LM_DBG("unregistering node %.*s\n", STR_FMT(&self_node->orig_uri));
+		self_node->status = DMQ_NODE_DISABLED;
+		request_nodelist(notification_node, 1);
+	}
+}
+
+static int handle_dmq_fixup(void** param, int param_no) {
+ 	return 0;
+}
+
+static int parse_server_address(str* uri, struct sip_uri* parsed_uri) {
+	if(!uri->s) {
+		LM_ERR("server address missing\n");
+		goto empty;
+	}
+	uri->len = strlen(uri->s);
+	if(!uri->len) {
+		LM_ERR("empty server address\n");
+		goto empty;
+	}
+	if(parse_uri(uri->s, uri->len, parsed_uri) < 0) {
+		LM_ERR("error parsing server address\n");
+		return -1;
+	}
+	return 0;
+empty:
+	uri->s = NULL;
+	return 0;
+}

+ 52 - 0
modules_k/dmq/dmq.h

@@ -0,0 +1,52 @@
+#ifndef DMQ_H
+#define DMQ_H
+
+#include "../../dprint.h"
+#include "../../error.h"
+#include "../../sr_module.h"
+#include "../../modules/tm/tm_load.h"
+#include "../../parser/parse_uri.h"
+#include "../../modules/sl/sl.h"
+#include "bind_dmq.h"
+#include "peer.h"
+#include "worker.h"
+
+#define DEFAULT_NUM_WORKERS	2
+#define MIN_PING_INTERVAL	60
+
+extern int num_workers;
+extern dmq_worker_t* workers;
+extern dmq_peer_t* dmq_notification_peer;
+extern str dmq_server_address;
+extern dmq_peer_list_t* peer_list;
+extern str dmq_request_method;
+extern struct sip_uri dmq_server_uri;
+extern str dmq_notification_address;
+extern struct sip_uri dmq_notification_uri;
+/* sl and tm */
+extern struct tm_binds tmb;
+extern sl_api_t slb;
+
+extern str dmq_200_rpl;
+extern str dmq_400_rpl;
+extern str dmq_500_rpl;
+extern str dmq_404_rpl;
+
+static inline int dmq_load_api(dmq_api_t* api) {
+	bind_dmq_f binddmq;
+	binddmq = (bind_dmq_f)find_export("bind_dmq", 0, 0);
+	if ( binddmq == 0) {
+		LM_ERR("cannot find bind_dmq\n");
+		return -1;
+	}
+	if (binddmq(api) < 0)
+	{
+		LM_ERR("cannot bind dmq api\n");
+		return -1;
+	}
+	return 0;
+}
+
+int handle_dmq_message(struct sip_msg* msg, char* str1 ,char* str2);
+
+#endif

+ 169 - 0
modules_k/dmq/dmq_funcs.c

@@ -0,0 +1,169 @@
+#include "dmq_funcs.h"
+#include "notification_peer.h"
+
+dmq_peer_t* register_dmq_peer(dmq_peer_t* peer) {
+	dmq_peer_t* new_peer;
+	lock_get(&peer_list->lock);
+	if(search_peer_list(peer_list, peer)) {
+		LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len, peer->peer_id.s,
+		       peer->description.len, peer->description.s);
+		lock_release(&peer_list->lock);
+		return NULL;
+	}
+	new_peer = add_peer(peer_list, peer);
+	lock_release(&peer_list->lock);
+	return new_peer;
+}
+
+void dmq_tm_callback(struct cell *t, int type, struct tmcb_params *ps) {
+	dmq_cback_param_t* cb_param = (dmq_cback_param_t*)(*ps->param);
+	LM_DBG("dmq_tm_callback start\n");
+	if(cb_param->resp_cback.f) {
+		if(cb_param->resp_cback.f(ps->rpl, ps->code, cb_param->node, cb_param->resp_cback.param) < 0) {
+			LM_ERR("error in response callback\n");
+		}
+	}
+	LM_DBG("dmq_tm_callback done\n");
+	shm_free_node(cb_param->node);
+	shm_free(cb_param);
+}
+
+int build_uri_str(str* username, struct sip_uri* uri, str* from) {
+	/* sip:user@host:port */
+	int from_len = username->len + uri->host.len + uri->port.len + 10;
+	if(!uri->host.s || !uri->host.len) {
+		LM_ERR("no host in uri\n");
+		return -1;
+	}
+	if(!username->s || !username->len) {
+		LM_ERR("no username given\n");
+		return -1;
+	}
+	from->s = pkg_malloc(from_len);
+	from->len = 0;
+	
+	memcpy(from->s, "sip:", 4);
+	from->len += 4;
+	
+	memcpy(from->s + from->len, username->s, username->len);
+	from->len += username->len;
+	
+	memcpy(from->s + from->len, "@", 1);
+	from->len += 1;
+	
+	memcpy(from->s + from->len, uri->host.s, uri->host.len);
+	from->len += uri->host.len;
+	
+	if(uri->port.s && uri->port.len) {
+		memcpy(from->s + from->len, ":", 1);
+		from->len += 1;
+		memcpy(from->s + from->len, uri->port.s, uri->port.len);
+		from->len += uri->port.len;
+	}
+	return 0;
+}
+
+/* broadcast a dmq message
+ * peer - the peer structure on behalf of which we are sending
+ * body - the body of the message
+ * except - we do not send the message to this node
+ * resp_cback - a response callback that gets called when the transaction is complete
+ */
+int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_cback_t* resp_cback, int max_forwards) {
+	dmq_node_t* node;
+	
+	lock_get(&node_list->lock);
+	node = node_list->nodes;
+	while(node) {
+		/* we do not send the message to the following:
+		 *   - the except node
+		 *   - itself
+		 *   - any inactive nodes
+		 */
+		if((except && cmp_dmq_node(node, except)) || node->local || node->status != DMQ_NODE_ACTIVE) {
+			LM_DBG("skipping node %.*s\n", STR_FMT(&node->orig_uri));
+			node = node->next;
+			continue;
+		}
+		if(send_dmq_message(peer, body, node, resp_cback, max_forwards) < 0) {
+			LM_ERR("error sending dmq message\n");
+			goto error;
+		}
+		node = node->next;
+	}
+	lock_release(&node_list->lock);
+	return 0;
+error:
+	lock_release(&node_list->lock);
+	return -1;
+}
+
+/* send a dmq message
+ * peer - the peer structure on behalf of which we are sending
+ * body - the body of the message
+ * node - we send the message to this node
+ * resp_cback - a response callback that gets called when the transaction is complete
+ */
+int send_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback, int max_forwards) {
+	uac_req_t uac_r;
+	str str_hdr = {0, 0};
+	str from, to, req_uri;
+	dmq_cback_param_t* cb_param = NULL;
+	int result = 0;
+	int len = 0;
+	
+	/* Max-Forwards */
+	str_hdr.len = 18 + CRLF_LEN;
+	str_hdr.s = pkg_malloc(str_hdr.len);
+	len += sprintf(str_hdr.s, "Max-Forwards: %d%s", max_forwards, CRLF);
+	str_hdr.len = len;
+	
+	cb_param = shm_malloc(sizeof(*cb_param));
+	memset(cb_param, 0, sizeof(*cb_param));
+	cb_param->resp_cback = *resp_cback;
+	cb_param->node = shm_dup_node(node);
+	
+	if(build_uri_str(&peer->peer_id, &dmq_server_uri, &from) < 0) {
+		LM_ERR("error building from string [username %.*s]\n", STR_FMT(&peer->peer_id));
+		goto error;
+	}
+	if(build_uri_str(&peer->peer_id, &node->uri, &to) < 0) {
+		LM_ERR("error building to string\n");
+		goto error;
+	}
+	req_uri = to;
+	
+	set_uac_req(&uac_r, &dmq_request_method, &str_hdr, body, NULL, TMCB_LOCAL_COMPLETED,
+			dmq_tm_callback, (void*)cb_param);
+	result = tmb.t_request(&uac_r, &req_uri,
+			       &to, &from,
+			       NULL);
+	if(result < 0) {
+		LM_ERR("error in tmb.t_request_within\n");
+		goto error;
+	}
+	pkg_free(str_hdr.s);
+	return 0;
+error:
+	pkg_free(str_hdr.s);
+	return -1;
+}
+
+/* pings the servers in the nodelist
+ * if the server does not reply to the ping, it is removed from the list
+ * the ping messages are actualy notification requests
+ * this way the ping will have two uses:
+ *   - checks if the servers in the list are up and running
+ *   - updates the list of servers from the other nodes
+ */
+void ping_servers(unsigned int ticks,void *param) {
+	str* body = build_notification_body();
+	int ret;
+	LM_DBG("ping_servers\n");
+	ret = bcast_dmq_message(dmq_notification_peer, body, notification_node, &notification_callback, 1);
+	pkg_free(body->s);
+	pkg_free(body);
+	if(ret < 0) {
+		LM_ERR("error broadcasting message\n");
+	}
+}

+ 28 - 0
modules_k/dmq/dmq_funcs.h

@@ -0,0 +1,28 @@
+#ifndef DMQ_FUNCS_H
+#define DMQ_FUNCS_H
+
+#include "../../str.h"
+#include "../../modules/tm/dlg.h"
+#include "../../modules/tm/tm_load.h"
+#include "../../config.h"
+#include "peer.h"
+#include "worker.h"
+#include "dmqnode.h"
+
+void ping_servers(unsigned int ticks,void *param);
+
+typedef struct dmq_resp_cback {
+	int (*f)(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
+	void* param;
+} dmq_resp_cback_t;
+
+typedef struct dmq_cback_param {
+	dmq_resp_cback_t resp_cback;
+	dmq_node_t* node;
+} dmq_cback_param_t;
+
+dmq_peer_t* register_dmq_peer(dmq_peer_t* peer);
+int send_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback, int max_forwards);
+int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_cback_t* resp_cback, int max_forwards);
+
+#endif

+ 248 - 0
modules_k/dmq/dmqnode.c

@@ -0,0 +1,248 @@
+#include "../../ut.h"
+#include "dmqnode.h"
+#include "dmq.h"
+
+dmq_node_t* self_node;
+dmq_node_t* notification_node;
+
+/* name */
+str dmq_node_status_str = str_init("status");
+/* possible values */
+str dmq_node_active_str = str_init("active");
+str dmq_node_disabled_str = str_init("disabled");
+str dmq_node_timeout_str = str_init("timeout");
+
+str* get_status_str(int status) {
+	switch(status) {
+		case DMQ_NODE_ACTIVE: {
+			return &dmq_node_active_str;
+		}
+		case DMQ_NODE_DISABLED: {
+			return &dmq_node_disabled_str;
+		}
+		case DMQ_NODE_TIMEOUT: {
+			return &dmq_node_timeout_str;
+		}
+		default: {
+			return 0;
+		}
+	}
+}
+
+dmq_node_list_t* init_dmq_node_list() {
+	dmq_node_list_t* node_list = shm_malloc(sizeof(dmq_node_list_t));
+	memset(node_list, 0, sizeof(dmq_node_list_t));
+	lock_init(&node_list->lock);
+	return node_list;
+}
+
+inline int cmp_dmq_node(dmq_node_t* node, dmq_node_t* cmpnode) {
+	if(!node || !cmpnode) {
+		LM_ERR("cmp_dmq_node - null node received\n");
+		return -1;
+	}
+	return STR_EQ(node->uri.host, cmpnode->uri.host) &&
+	       STR_EQ(node->uri.port, cmpnode->uri.port);
+}
+
+static str* get_param_value(param_t* params, str* param) {
+	while (params) {
+		if ((params->name.len == param->len) &&
+		    (strncmp(params->name.s, param->s, param->len) == 0)) {
+			return &params->body;
+		}
+		params = params->next;
+	}
+	return NULL;
+}
+
+inline int set_dmq_node_params(dmq_node_t* node, param_t* params) {
+	str* status;
+	if(!params) {
+		LM_DBG("no parameters given\n");
+		return 0;
+	}
+	status = get_param_value(params, &dmq_node_status_str);
+	if(status) {
+		if(str_strcmp(status, &dmq_node_active_str)) {
+			node->status = DMQ_NODE_ACTIVE;
+		} else if(str_strcmp(status, &dmq_node_timeout_str)) {
+			node->status = DMQ_NODE_ACTIVE;
+		} else if(str_strcmp(status, &dmq_node_disabled_str)) {
+			node->status = DMQ_NODE_ACTIVE;
+		} else {
+			LM_ERR("invalid status parameter: %.*s\n", STR_FMT(status));
+			goto error;
+		}
+	}
+	return 0;
+error:
+	return -1;
+}
+
+inline int set_default_dmq_node_params(dmq_node_t* node) {
+	node->status = DMQ_NODE_ACTIVE;
+	return 0;
+}
+
+inline dmq_node_t* build_dmq_node(str* uri, int shm) {
+	dmq_node_t* ret;
+	param_hooks_t hooks;
+	param_t* params;
+	
+	LM_DBG("build_dmq_node %.*s with %s memory\n", STR_FMT(uri), shm?"shm":"private");
+	
+	if(shm) {
+		ret = shm_malloc(sizeof(*ret));
+		memset(ret, 0, sizeof(*ret));
+		shm_str_dup(&ret->orig_uri, uri);
+	} else {
+		ret = pkg_malloc(sizeof(*ret));
+		memset(ret, 0, sizeof(*ret));
+		pkg_str_dup(&ret->orig_uri, uri);
+	}
+	set_default_dmq_node_params(ret);
+	if(parse_uri(ret->orig_uri.s, ret->orig_uri.len, &ret->uri) < 0) {
+		LM_ERR("error parsing uri\n");
+		goto error;
+	}
+	/* if any parameters found, parse them */
+	if(parse_params(&ret->uri.params, CLASS_ANY, &hooks, &params) < 0) {
+		LM_ERR("error parsing params\n");
+		goto error;
+	}
+	/* if any params found */
+	if(params) {
+		if(shm) {
+			if(shm_duplicate_params(&ret->params, params) < 0) {
+				LM_ERR("error duplicating params\n");
+				free_params(params);
+				goto error;
+			}
+			free_params(params);
+		} else {
+			ret->params = params;
+		}
+		if(set_dmq_node_params(ret, ret->params) < 0) {
+			LM_ERR("error setting parameters\n");
+			goto error;
+		}
+	} else {
+		LM_DBG("no dmqnode params found\n");		
+	}
+	return ret;
+error:
+	return NULL;
+}
+
+inline dmq_node_t* find_dmq_node_uri(dmq_node_list_t* list, str* uri) {
+	dmq_node_t *ret, *find;
+	find =  build_dmq_node(uri, 0);
+	ret = find_dmq_node(list, find);
+	destroy_dmq_node(find, 0);
+	return ret;
+}
+
+inline void destroy_dmq_node(dmq_node_t* node, int shm) {
+	if(shm) {
+		shm_free_node(node);
+	} else {
+		pkg_free_node(node);
+	}
+}
+
+inline dmq_node_t* find_dmq_node(dmq_node_list_t* list, dmq_node_t* node) {
+	dmq_node_t* cur = list->nodes;
+	while(cur) {
+		if(cmp_dmq_node(cur, node)) {
+			return cur;
+		}
+		cur = cur->next;
+	}
+	return NULL;
+}
+
+inline dmq_node_t* shm_dup_node(dmq_node_t* node) {
+	dmq_node_t* newnode = shm_malloc(sizeof(dmq_node_t));
+	memcpy(newnode, node, sizeof(dmq_node_t));
+	shm_str_dup(&newnode->orig_uri, &node->orig_uri);
+	if(parse_uri(newnode->orig_uri.s, newnode->orig_uri.len, &newnode->uri) < 0) {
+		LM_ERR("error in parsing node uri\n");
+		goto error;
+	}
+	return newnode;
+error:
+	shm_free(newnode->orig_uri.s);
+	shm_free(newnode);
+	return NULL;
+}
+
+inline void shm_free_node(dmq_node_t* node) {
+	shm_free(node->orig_uri.s);
+	shm_free(node);
+}
+
+inline void pkg_free_node(dmq_node_t* node) {
+	pkg_free(node->orig_uri.s);
+	pkg_free(node);
+}
+
+inline int del_dmq_node(dmq_node_list_t* list, dmq_node_t* node) {
+	dmq_node_t *cur, **prev;
+	lock_get(&list->lock);
+	cur = list->nodes;
+	prev = &list->nodes;
+	while(cur) {
+		if(cmp_dmq_node(cur, node)) {
+			*prev = cur->next;
+			shm_free_node(cur);
+			lock_release(&list->lock);
+			return 1;
+		}
+		prev = &cur->next;
+		cur = cur->next;
+	}
+	lock_release(&list->lock);
+	return 0;
+}
+
+inline dmq_node_t* add_dmq_node(dmq_node_list_t* list, str* uri) {
+	dmq_node_t* newnode = build_dmq_node(uri, 1);
+	if(!newnode) {
+		LM_ERR("error creating node\n");
+		goto error;
+	}
+	LM_DBG("dmq node successfully created\n");
+	lock_get(&list->lock);
+	newnode->next = list->nodes;
+	list->nodes = newnode;
+	list->count++;
+	lock_release(&list->lock);
+	return newnode;
+error:
+	return NULL;
+}
+
+int build_node_str(dmq_node_t* node, char* buf, int buflen) {
+	/* sip:host:port;status=[status] */
+	int len = 0;
+	if(buflen < node->orig_uri.len + 32) {
+		LM_ERR("no more space left for node string\n");
+		return -1;
+	}
+	memcpy(buf + len, "sip:", 4);
+	len += 4;
+	memcpy(buf + len, node->uri.host.s, node->uri.host.len);
+	len += node->uri.host.len;
+	memcpy(buf + len, ":", 1);
+	len += 1;
+	memcpy(buf + len, node->uri.port.s, node->uri.port.len);
+	len += node->uri.port.len;
+	memcpy(buf + len, ";", 1);
+	len += 1;
+	memcpy(buf + len, "status=", 7);
+	len += 7;
+	memcpy(buf + len, get_status_str(node->status)->s, get_status_str(node->status)->len);
+	len += get_status_str(node->status)->len;
+	return len;
+}

+ 57 - 0
modules_k/dmq/dmqnode.h

@@ -0,0 +1,57 @@
+#ifndef DMQNODE_H
+#define DMQNODE_H
+
+#include <string.h>
+#include <stdlib.h>
+#include "../../lock_ops.h"
+#include "../../str.h"
+#include "../../mem/mem.h"
+#include "../../mem/shm_mem.h"
+#include "../../parser/parse_uri.h"
+#include "../../parser/parse_param.h"
+
+#define NBODY_LEN		1024
+#define DMQ_NODE_ACTIVE		1 << 1
+#define DMQ_NODE_TIMEOUT	1 << 2
+#define DMQ_NODE_DISABLED	1 << 3
+
+typedef struct dmq_node {
+	int local; /* local type set means the dmq dmqnode == self */
+	str orig_uri; /* original uri string - e.g. sip:127.0.0.1:5060;passive=true */
+	struct sip_uri uri; /* parsed uri string */
+	param_t* params; /* uri parameters */
+	int status; /* reserved - maybe something like active,timeout,disabled */
+	int last_notification; /* last notificatino receied from the node */
+	struct dmq_node* next; /* pointer to the next struct dmq_node */
+} dmq_node_t;
+
+typedef struct dmq_node_list {
+	gen_lock_t lock; /* lock for the list - must acquire before manipulating it */
+	struct dmq_node* nodes; /* the nodes in the list */
+	int count; /* the number of nodes in the list */
+} dmq_node_list_t;
+
+extern str dmq_node_status_str;
+extern dmq_node_list_t* node_list;
+
+dmq_node_list_t* init_dmq_node_list();
+dmq_node_t* build_dmq_node(str* uri, int shm);
+int update_node_list(dmq_node_list_t* remote_list);
+dmq_node_t* add_dmq_node(dmq_node_list_t* list, str* uri);
+dmq_node_t* find_dmq_node(dmq_node_list_t* list, dmq_node_t* node);
+dmq_node_t* find_dmq_node_uri(dmq_node_list_t* list, str* uri);
+int del_dmq_node(dmq_node_list_t* list, dmq_node_t* node);
+int cmp_dmq_node(dmq_node_t* node, dmq_node_t* cmpnode);
+dmq_node_t* shm_dup_node(dmq_node_t* node);
+void destroy_dmq_node(dmq_node_t* node, int shm);
+void shm_free_node(dmq_node_t* node);
+void pkg_free_node(dmq_node_t* node);
+int set_dmq_node_params(dmq_node_t* node, param_t* params);
+
+str* get_status_str(int status);
+int build_node_str(dmq_node_t* node, char* buf, int buflen);
+
+extern dmq_node_t* self_node;
+extern dmq_node_t* notification_node;	
+
+#endif

+ 4 - 0
modules_k/dmq/doc/Makefile

@@ -0,0 +1,4 @@
+docs = dmq.xml
+
+docbook_dir = ../../../docbook
+include $(docbook_dir)/Makefile.module

+ 44 - 0
modules_k/dmq/doc/dmq.xml

@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+
+<book xmlns:xi="http://www.w3.org/2001/XInclude">
+    <bookinfo>
+	<title>Distributed Message Queue Module</title>
+	<productname class="trade">&kamailioname;</productname>
+	<authorgroup>
+	    <author>
+		<firstname>Marius Ovidiu</firstname>
+		<surname>Bucur</surname>
+		<address>
+		<email>[email protected]</email>
+		</address>
+	    </author>
+	    <editor>
+		<firstname>Marius Ovidiu</firstname>
+		<surname>Bucur</surname>
+		<address>
+		    <email>[email protected]</email>
+		</address>
+	    </editor>
+	</authorgroup>
+	<copyright>
+	    <year>2011</year>
+	    <holder>Marius Bucur</holder>
+	</copyright>
+  </bookinfo>
+    <toc></toc>
+    
+    <xi:include href="dmq_admin.xml"/>
+    <xi:include href="dmq_devel.xml"/>
+    
+</book>
+
+

+ 103 - 0
modules_k/dmq/doc/dmq_admin.xml

@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+<!-- Module User's Guide -->
+
+<chapter>
+	<title>&adminguide;</title>
+	
+	<section>
+	<title>Overview</title>
+	<para> The DMQ module implements a distributed message passing system on top of Kamailio.
+	The DMQ nodes within the system are grouped in a logical entity called DMQ bus and are able to
+	communicate with each others by sending/receiving messages (either by broadcast or sending a DMQ
+	message to a specific node).
+	The system transparently deals with node discovery, node consistency within the DMQ bus, retransmissions,
+	etc.
+	</para>
+	
+	</section>
+
+	<section>
+	<title>Dependencies</title>
+	<section>
+		<title>&kamailio; Modules</title>
+		<para>
+		The following modules must be loaded before this module:
+			<itemizedlist>
+			<listitem>
+			<para>
+				<emphasis>sl</emphasis>.
+			</para>
+			</listitem>
+			<listitem>
+			<para>
+				<emphasis>tm</emphasis>.
+			</para>
+			</listitem>
+			</itemizedlist>
+		</para>
+	</section>
+
+	<section>
+		<title>External Libraries or Applications</title>
+		<itemizedlist>
+			<listitem>
+			<para>
+				<emphasis>
+				Each peer needs to use its own serialization mechanism.
+				Some examples are libtpl, protobuf.
+				</emphasis>.
+			</para>
+			</listitem>
+		</itemizedlist>
+	</section>
+	
+	<title>Exported Parameters</title>
+	<section>
+		<title><varname>dmq_server_address</varname>(str)</title>
+		<para>
+		The local server address.
+		</para>
+		<para>
+		The modules needs it to know on which interface the DMQ engine should send and receive messages.
+		</para>
+		<para>
+		<emphasis>Default value is <quote>NULL</quote>.	
+		</emphasis>
+		</para>
+		<example>
+		<title>Set <varname>dmq_server_address</varname> parameter</title>
+		<programlisting format="linespecific">
+...
+modparam("dmq", "dmq_server_address", "&defaultdb;")
+...
+</programlisting>
+		</example>
+	</section>
+	<section>
+		<title><varname>dmq_notification_address</varname>(str)</title>
+		<para>
+		The address of the DMQ node from which the local node should retrieve initial information.
+		</para>
+		<para>
+		<emphasis>	Default value is <quote>NULL</quote>.
+		</emphasis>
+		</para>
+		<example>
+		<title>Set <varname>dmq_notification_address</varname> parameter</title>
+		<programlisting format="linespecific">
+...
+modparam("dmq", "dmq_notification_address", "&defaultdb;")
+...
+</programlisting>
+		</example>
+	</section>
+</chapter>
+

+ 45 - 0
modules_k/dmq/doc/dmq_devel.xml

@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+<!-- Module Developer's Guide -->
+
+<chapter>
+    <title>&develguide;</title>
+    <para>
+		The module provides the following functions that can be used
+		in other &kamailio; modules.
+   </para>
+ 		<section>
+				<title>
+				<function moreinfo="none">dmq_load_api(dmq_api_t* api)</function>
+				</title>
+			<para>
+				This function binds the dmq modules and fills the structure 
+				with the exported functions
+				-> register_dmq_peer - registers an entity as a DMQ peer which permits receiving/sending
+				messages between nodes which support the same peer,
+				-> bcast_message - broadcast a DMQ message to all peers available in the DMQ bus,
+				-> send_message - sends a DMQ message to a specific peer in the local DMQ bus.
+			</para>
+		<example>
+		<title><function>dmq_api_t</function> structure</title>
+	<programlisting format="linespecific">
+...
+typedef struct dmq_api {
+	register_dmq_peer_t register_dmq_peer;
+	bcast_message_t bcast_message;
+	send_message_t send_message;
+} dmq_api_t;
+...
+</programlisting>
+		</example>
+
+		</section>
+</chapter>
+

+ 50 - 0
modules_k/dmq/message.c

@@ -0,0 +1,50 @@
+#include "../../parser/parse_to.h"
+#include "../../parser/parse_uri.h"
+#include "../../sip_msg_clone.h"
+#include "../../parser/parse_content.h"
+#include "../../parser/parse_from.h"
+#include "../../ut.h"
+#include "dmq.h"
+#include "worker.h"
+#include "peer.h"
+#include "message.h"
+
+str dmq_200_rpl  = str_init("OK");
+str dmq_400_rpl  = str_init("Bad request");
+str dmq_500_rpl  = str_init("Server Internal Error");
+str dmq_404_rpl  = str_init("User Not Found");
+
+int handle_dmq_message(struct sip_msg* msg, char* str1, char* str2) {
+	dmq_peer_t* peer;
+	struct sip_msg* cloned_msg = NULL;
+	int cloned_msg_len;
+	if ((parse_sip_msg_uri(msg) < 0) || (!msg->parsed_uri.user.s)) {
+			LM_ERR("error parsing msg uri\n");
+			goto error;
+	}
+	LM_DBG("handle_dmq_message [%.*s %.*s] [%s %s]\n",
+	       msg->first_line.u.request.method.len, msg->first_line.u.request.method.s,
+	       msg->first_line.u.request.uri.len, msg->first_line.u.request.uri.s,
+	       ZSW(str1), ZSW(str2));
+	/* the peer id is given as the userinfo part of the request URI */
+	peer = find_peer(msg->parsed_uri.user);
+	if(!peer) {
+		LM_DBG("no peer found for %.*s\n", msg->parsed_uri.user.len, msg->parsed_uri.user.s);
+		if(slb.freply(msg, 404, &dmq_404_rpl) < 0)
+		{
+			LM_ERR("sending reply\n");
+			goto error;
+		}
+		return 0;
+	}
+	LM_DBG("handle_dmq_message peer found: %.*s\n", msg->parsed_uri.user.len, msg->parsed_uri.user.s);
+	cloned_msg = sip_msg_shm_clone(msg, &cloned_msg_len, 1);
+	if(!cloned_msg) {
+		LM_ERR("error cloning sip message\n");
+		goto error;
+	}
+	add_dmq_job(cloned_msg, peer);
+	return 0;
+error:
+	return -1;
+}

+ 2 - 0
modules_k/dmq/message.h

@@ -0,0 +1,2 @@
+
+int handle_dmq_message(struct sip_msg*, char*, char*);

+ 220 - 0
modules_k/dmq/notification_peer.c

@@ -0,0 +1,220 @@
+#include "notification_peer.h"
+
+static str notification_content_type = str_init("text/plain");
+dmq_resp_cback_t notification_callback = {&notification_resp_callback_f, 0};
+
+int add_notification_peer() {
+	dmq_peer_t not_peer;
+	not_peer.callback = dmq_notification_callback;
+	not_peer.description.s = "notification_peer";
+	not_peer.description.len = 17;
+	not_peer.peer_id.s = "notification_peer";
+	not_peer.peer_id.len = 17;
+	dmq_notification_peer = register_dmq_peer(&not_peer);
+	if(!dmq_notification_peer) {
+		LM_ERR("error in register_dmq_peer\n");
+		goto error;
+	}
+	/* add itself to the node list */
+	self_node = add_dmq_node(node_list, &dmq_server_address);
+	if(!self_node) {
+		LM_ERR("error adding self node\n");
+		goto error;
+	}
+	/* local node - only for self */
+	self_node->local = 1;
+	return 0;
+error:
+	return -1;
+}
+
+dmq_node_t* add_server_and_notify(str* server_address) {
+	/* add the notification server to the node list - if any */
+	dmq_node_t* node = add_dmq_node(node_list, server_address);
+	if(!node) {
+		LM_ERR("error adding notification node\n");
+		goto error;
+	}
+	/* request initial list from the notification server */
+	if(request_nodelist(node, 2) < 0) {
+		LM_ERR("error requesting initial nodelist\n");
+		goto error;
+	}
+	return node;
+error:
+	return NULL;
+}
+
+/**
+ * extract the node list from the body of a notification request SIP message
+ * the SIP request will look something like:
+ * 	KDMQ sip:10.0.0.0:5062
+ * 	To: ...
+ * 	From: ...
+ * 	Max-Forwards: ...
+ * 	Content-Length: 22
+ * 	
+ * 	sip:host1:port1;param1=value1
+ * 	sip:host2:port2;param2=value2
+ * 	...
+ */
+int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg) {
+	int content_length, total_nodes = 0;
+	str body;
+	str tmp_uri;
+	dmq_node_t *cur = NULL;
+	char *tmp, *end, *match;
+	if(!msg->content_length) {
+		LM_ERR("no content length header found\n");
+		return -1;
+	}
+	content_length = get_content_length(msg);
+	if(!content_length) {
+		LM_DBG("content length is 0\n");
+		return total_nodes;
+	}
+	body.s = get_body(msg);
+	body.len = content_length;
+	tmp = body.s;
+	end = body.s + body.len;
+	
+	/* acquire big list lock */
+	lock_get(&update_list->lock);
+	while(tmp < end) {
+		match = q_memchr(tmp, '\n', end - tmp);
+		if(match) {
+			match++;
+		} else {
+			/* for the last line - take all of it */
+			match = end;
+		}
+		/* create the orig_uri from the parsed uri line and trim it */
+		tmp_uri.s = tmp;
+		tmp_uri.len = match - tmp - 1;
+		tmp = match;
+		/* trim the \r, \n and \0's */
+		trim_r(tmp_uri);
+		if(!find_dmq_node_uri(update_list, &tmp_uri)) {
+			LM_DBG("found new node %.*s\n", STR_FMT(&tmp_uri));
+			cur = build_dmq_node(&tmp_uri, 1);
+			if(!cur) {
+				LM_ERR("error creating new dmq node\n");
+				goto error;
+			}
+			cur->next = update_list->nodes;
+			update_list->nodes = cur;
+			update_list->count++;
+			total_nodes++;
+		}
+	}
+	/* release big list lock */
+	lock_release(&update_list->lock);
+	return total_nodes;
+error:
+	lock_release(&update_list->lock);
+	return -1;
+}
+
+int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) {
+	int nodes_recv;
+	str* response_body = NULL;
+	unsigned int maxforwards = 1;
+	/* received dmqnode list */
+	LM_DBG("dmq triggered from dmq_notification_callback\n");
+	/* parse the message headers */
+	if(parse_headers(msg, HDR_EOH_F, 0) < 0) {
+		LM_ERR("error parsing message headers\n");
+		goto error;
+	}
+	
+	/* extract the maxforwards value, if any */
+	if(msg->maxforwards) {
+		LM_DBG("max forwards: %.*s\n", STR_FMT(&msg->maxforwards->body));
+		str2int(&msg->maxforwards->body, &maxforwards);
+	}
+	maxforwards--;
+	
+	nodes_recv = extract_node_list(node_list, msg);
+	LM_DBG("received %d new nodes\n", nodes_recv);
+	response_body = build_notification_body();
+	resp->content_type = notification_content_type;
+	resp->reason = dmq_200_rpl;
+	resp->body = *response_body;
+	resp->resp_code = 200;
+	
+	/* if we received any new nodes tell about them to the others */
+	if(nodes_recv > 0 && maxforwards > 0) {
+		/* maxforwards is set to 0 so that the message is will not be in a spiral */
+		bcast_dmq_message(dmq_notification_peer, response_body, 0, &notification_callback, maxforwards);
+	}
+	LM_DBG("broadcasted message\n");
+	pkg_free(response_body);
+	return 0;
+error:
+	return -1;
+}
+
+/**
+ * builds the body of a notification message from the list of servers 
+ * the result will look something like:
+ * sip:host1:port1;param1=value1\r\n
+ * sip:host2:port2;param2=value2\r\n
+ * sip:host3:port3;param3=value3
+ */
+str* build_notification_body() {
+	/* the length of the current line describing the server */
+	int slen;
+	/* the current length of the body */
+	int clen = 0;
+	dmq_node_t* cur_node = NULL;
+	str* body;
+	body = pkg_malloc(sizeof(str));
+	memset(body, 0, sizeof(str));
+	/* we allocate a chunk of data for the body */
+	body->len = NBODY_LEN;
+	body->s = pkg_malloc(body->len);
+	/* we add each server to the body - each on a different line */
+	lock_get(&node_list->lock);
+	cur_node = node_list->nodes;
+	while(cur_node) {
+		LM_DBG("body_len = %d - clen = %d\n", body->len, clen);
+		/* body->len - clen - 2 bytes left to write - including the \r\n */
+		slen = build_node_str(cur_node, body->s + clen, body->len - clen - 2);
+		if(slen < 0) {
+			LM_ERR("cannot build_node_string\n");
+			goto error;
+		}
+		clen += slen;
+		body->s[clen++] = '\r';
+		body->s[clen++] = '\n';
+		cur_node = cur_node->next;
+	}
+	lock_release(&node_list->lock);
+	body->len = clen;
+	return body;
+error:
+	pkg_free(body->s);
+	pkg_free(body);
+	return NULL;
+}
+
+int request_nodelist(dmq_node_t* node, int forward) {
+	str* body = build_notification_body();
+	int ret;
+	ret = send_dmq_message(dmq_notification_peer, body, node, &notification_callback, forward);
+	pkg_free(body->s);
+	pkg_free(body);
+	return ret;
+}
+
+int notification_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param) {
+	int ret;
+	LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
+	if(code == 408) {
+		/* deleting node - the server did not respond */
+		LM_ERR("deleting server %.*s because of failed request\n", STR_FMT(&node->orig_uri));
+		ret = del_dmq_node(node_list, node);
+		LM_DBG("del_dmq_node returned %d\n", ret);
+	}
+	return 0;
+}

+ 26 - 0
modules_k/dmq/notification_peer.h

@@ -0,0 +1,26 @@
+#include "../../parser/msg_parser.h"
+#include "../../parser/parse_content.h"
+#include "../../ut.h"
+#include "dmq.h"
+#include "dmqnode.h"
+#include "peer.h"
+#include "dmq_funcs.h"
+
+int add_notification_peer();
+int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp);
+int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg);
+str* build_notification_body();
+int build_node_str(dmq_node_t* node, char* buf, int buflen);
+/* request a nodelist from a server
+ * this is acomplished by a KDMQ request
+ * KDMQ notification@server:port
+ * node - the node to send to
+ * forward - flag that tells if the node receiving the message is allowed to 
+ *           forward the request to its own list
+ */
+int request_nodelist(dmq_node_t* node, int forward);
+dmq_node_t* add_server_and_notify(str* server_address);
+
+/* helper functions */
+extern int notification_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
+extern dmq_resp_cback_t notification_callback;

+ 44 - 0
modules_k/dmq/peer.c

@@ -0,0 +1,44 @@
+#include "peer.h"
+#include "dmq.h"
+
+dmq_peer_list_t* init_peer_list() {
+	dmq_peer_list_t* peer_list = shm_malloc(sizeof(dmq_peer_list_t));
+	memset(peer_list, 0, sizeof(dmq_peer_list_t));
+	lock_init(&peer_list->lock);
+	return peer_list;
+}
+
+dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
+	dmq_peer_t* cur = peer_list->peers;
+	int len;
+	while(cur) {
+		/* len - the minimum length of the two strings */
+		len = cur->peer_id.len < peer->peer_id.len ? cur->peer_id.len:peer->peer_id.len;
+		if(strncasecmp(cur->peer_id.s, peer->peer_id.s, len) == 0) {
+			return cur;
+		}
+		cur = cur->next;
+	}
+	return 0;
+}
+
+dmq_peer_t* add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
+	dmq_peer_t* new_peer = shm_malloc(sizeof(dmq_peer_t));
+	*new_peer = *peer;
+	
+	/* copy the str's */
+	new_peer->peer_id.s = shm_malloc(peer->peer_id.len);
+	memcpy(new_peer->peer_id.s, peer->peer_id.s, peer->peer_id.len);
+	new_peer->description.s = shm_malloc(peer->description.len);
+	memcpy(new_peer->peer_id.s, peer->peer_id.s, peer->peer_id.len);
+	
+	new_peer->next = peer_list->peers;
+	peer_list->peers = new_peer;
+	return new_peer;
+}
+
+dmq_peer_t* find_peer(str peer_id) {
+	dmq_peer_t foo_peer;
+	foo_peer.peer_id = peer_id;
+	return search_peer_list(peer_list, &foo_peer);
+}

+ 44 - 0
modules_k/dmq/peer.h

@@ -0,0 +1,44 @@
+#ifndef PEER_H
+#define PEER_H
+
+#include <string.h>
+#include <stdlib.h>
+#include "../../lock_ops.h"
+#include "../../str.h"
+#include "../../mem/mem.h"
+#include "../../mem/shm_mem.h"
+#include "../../parser/msg_parser.h"
+
+typedef struct peer_response {
+	int resp_code;
+	str content_type;
+	str reason;
+	str body;
+} peer_reponse_t;
+
+typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp);
+
+typedef struct dmq_peer {
+	str peer_id;
+	str description;
+	peer_callback_t callback;
+	struct dmq_peer* next;
+} dmq_peer_t;
+
+typedef struct dmq_peer_list {
+	gen_lock_t lock;
+	dmq_peer_t* peers;
+	int count;
+} dmq_peer_list_t;
+
+extern dmq_peer_list_t* peer_list;
+
+dmq_peer_list_t* init_peer_list();
+dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer);
+typedef dmq_peer_t* (*register_dmq_peer_t)(dmq_peer_t*);
+
+dmq_peer_t* add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer);
+dmq_peer_t* find_peer(str peer_id);
+
+
+#endif

+ 185 - 0
modules_k/dmq/worker.c

@@ -0,0 +1,185 @@
+#include "dmq.h"
+#include "peer.h"
+#include "worker.h"
+#include "../../data_lump_rpl.h"
+#include "../../mod_fix.h"
+
+/* set the body of a response */
+static int set_reply_body(struct sip_msg* msg, str* body, str* content_type)
+{
+	char* buf;
+	int len;
+	int value_len;
+	str nb = *body;
+	str nc = *content_type;
+
+	/* add content-type */
+	value_len = nc.len;
+	len=sizeof("Content-Type: ") - 1 + value_len + CRLF_LEN;
+	buf=pkg_malloc(sizeof(char)*(len));
+
+	if (buf==0) {
+		LM_ERR("out of pkg memory\n");
+		return -1;
+	}
+	memcpy(buf, "Content-Type: ", sizeof("Content-Type: ") - 1);
+	memcpy(buf+sizeof("Content-Type: ") - 1, nc.s, value_len);
+	memcpy(buf+sizeof("Content-Type: ") - 1 + value_len, CRLF, CRLF_LEN);
+	if (add_lump_rpl(msg, buf, len, LUMP_RPL_HDR) == 0) {
+		LM_ERR("failed to insert content-type lump\n");
+		pkg_free(buf);
+		return -1;
+	}
+	pkg_free(buf);
+
+	/* add body */
+	if (add_lump_rpl(msg, nb.s, nb.len, LUMP_RPL_BODY) == 0) {
+		LM_ERR("cannot add body lump\n");
+		return -1;
+	}
+		
+	return 1;
+}
+
+void worker_loop(int id) {
+	dmq_worker_t* worker = &workers[id];
+	dmq_job_t* current_job;
+	peer_reponse_t peer_response;
+	int ret_value;
+	for(;;) {
+		LM_DBG("dmq_worker [%d %d] getting lock\n", id, my_pid());
+		lock_get(&worker->lock);
+		LM_DBG("dmq_worker [%d %d] lock acquired\n", id, my_pid());
+		/* multiple lock_release calls might be performed, so remove from queue until empty */
+		do {
+			/* fill the response with 0's */
+			memset(&peer_response, 0, sizeof(peer_response));
+			current_job = job_queue_pop(worker->queue);
+			/* job_queue_pop might return NULL if queue is empty */
+			if(current_job) {
+				ret_value = current_job->f(current_job->msg, &peer_response);
+				if(ret_value < 0) {
+					LM_ERR("running job failed\n");
+					continue;
+				}
+				/* add the body to the reply */
+				if(peer_response.body.s) {
+					if(set_reply_body(current_job->msg, &peer_response.body, &peer_response.content_type) < 0) {
+						LM_ERR("error adding lumps\n");
+						continue;
+					}
+				}
+				/* send the reply */
+				if(slb.freply(current_job->msg, peer_response.resp_code, &peer_response.reason) < 0)
+				{
+					LM_ERR("error sending reply\n");
+				}
+				
+				/* if body given, free the lumps and free the body */
+				if(peer_response.body.s) {
+					del_nonshm_lump_rpl(&current_job->msg->reply_lump);
+					pkg_free(peer_response.body.s);
+				}
+				LM_DBG("sent reply\n");
+				shm_free(current_job->msg);
+				shm_free(current_job);
+				worker->jobs_processed++;
+			}
+		} while(job_queue_size(worker->queue) > 0);
+	}
+}
+
+int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer) {
+	int i, found_available = 0;
+	dmq_job_t new_job;
+	dmq_worker_t* worker;
+	new_job.f = peer->callback;
+	new_job.msg = msg;
+	new_job.orig_peer = peer;
+	if(!num_workers) {
+		LM_ERR("error in add_dmq_job: no workers spawned\n");
+		return -1;
+	}
+	/* initialize the worker with the first one */
+	worker = workers;
+	/* search for an available worker, or, if not possible, for the least busy one */
+	for(i = 0; i < num_workers; i++) {
+		if(job_queue_size(workers[i].queue) == 0) {
+			worker = &workers[i];
+			found_available = 1;
+			break;
+		} else if(job_queue_size(workers[i].queue) < job_queue_size(worker->queue)) {
+			worker = &workers[i];
+		}
+	}
+	if(!found_available) {
+		LM_DBG("no available worker found, passing job to the least busy one [%d %d]\n",
+		       worker->pid, job_queue_size(worker->queue));
+	}
+	job_queue_push(worker->queue, &new_job);
+	lock_release(&worker->lock);
+	return 0;
+}
+
+void init_worker(dmq_worker_t* worker) {
+	memset(worker, 0, sizeof(*worker));
+	lock_init(&worker->lock);
+	// acquire the lock for the first time - so that dmq_worker_loop blocks
+	lock_get(&worker->lock);
+	worker->queue = alloc_job_queue();
+}
+
+job_queue_t* alloc_job_queue() {
+	job_queue_t* queue = shm_malloc(sizeof(job_queue_t));
+	atomic_set(&queue->count, 0);
+	queue->front = NULL;
+	queue->back = NULL;
+	lock_init(&queue->lock);
+	return queue;
+}
+
+void destroy_job_queue(job_queue_t* queue) {
+	shm_free(queue);
+}
+
+int job_queue_size(job_queue_t* queue) {
+	return atomic_get(&queue->count);
+}
+
+void job_queue_push(job_queue_t* queue, dmq_job_t* job) {
+	/* we need to copy the dmq_job into a newly created dmq_job in shm */
+	dmq_job_t* newjob = shm_malloc(sizeof(dmq_job_t));
+	*newjob = *job;
+	
+	lock_get(&queue->lock);
+	newjob->prev = NULL;
+	newjob->next = queue->back;
+	if(queue->back) {
+		queue->back->prev = newjob;
+	}
+	queue->back = newjob;
+	if(!queue->front) {
+		queue->front = newjob;
+	}
+	atomic_inc(&queue->count);
+	lock_release(&queue->lock);
+}
+dmq_job_t* job_queue_pop(job_queue_t* queue) {
+	dmq_job_t* front;
+	lock_get(&queue->lock);
+	if(!queue->front) {
+		lock_release(&queue->lock);
+		return NULL;
+	}
+	front = queue->front;
+	if(front->prev) {
+		queue->front = front->prev;
+		front->prev->next = NULL;
+	} else {
+		queue->front = NULL;
+		queue->back = NULL;
+	}
+	atomic_dec(&queue->count);
+	lock_release(&queue->lock);
+	return front;
+}

+ 43 - 0
modules_k/dmq/worker.h

@@ -0,0 +1,43 @@
+#ifndef DMQ_WORKER_H
+#define DMQ_WORKER_H
+
+#include "peer.h"
+#include "../../locking.h"
+#include "../../atomic_ops.h"
+#include "../../parser/msg_parser.h"
+
+typedef struct dmq_job {
+	peer_callback_t f;
+	struct sip_msg* msg;
+	dmq_peer_t* orig_peer;
+	struct dmq_job* next;
+	struct dmq_job* prev;
+} dmq_job_t;
+
+typedef struct job_queue {
+	atomic_t count;
+	struct dmq_job* back;
+	struct dmq_job* front;
+	gen_lock_t lock;
+} job_queue_t;
+
+struct dmq_worker {
+	job_queue_t* queue;
+	int jobs_processed;
+	gen_lock_t lock;
+	int pid;
+};
+
+typedef struct dmq_worker dmq_worker_t;
+
+void init_worker(dmq_worker_t* worker);
+int add_dmq_job(struct sip_msg*, dmq_peer_t*);
+void worker_loop(int id);
+
+job_queue_t* alloc_job_queue();
+void destroy_job_queue(job_queue_t* queue);
+void job_queue_push(job_queue_t* queue, dmq_job_t* job);
+dmq_job_t* job_queue_pop(job_queue_t* queue);
+int job_queue_size(job_queue_t* queue);
+
+#endif