浏览代码

dmq: many safety checks for mem mallocs and function return codes

- added license header in the files
Daniel-Constantin Mierla 12 年之前
父节点
当前提交
1977645ceb

+ 27 - 0
modules/dmq/bind_dmq.c

@@ -1,8 +1,35 @@
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
 #include "dmq.h"
 #include "bind_dmq.h"
 #include "peer.h"
 #include "dmq_funcs.h"
 
+/**
+ * @brief bind dmq module api
+ */
 int bind_dmq(dmq_api_t* api) {
 	api->register_dmq_peer = register_dmq_peer;
 	api->send_message = dmq_send_message;

+ 31 - 4
modules/dmq/bind_dmq.h

@@ -1,12 +1,39 @@
-#ifndef BIND_DMQ_H
-#define BIND_DMQ_H
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+
+#ifndef _BIND_DMQ_H_
+#define _BIND_DMQ_H_
 
 #include "peer.h"
 #include "dmqnode.h"
 #include "dmq_funcs.h"
 
-typedef int (*bcast_message_t)(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_cback_t* resp_cback, int max_forwards);
-typedef int (*send_message_t)(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback, int max_forwards);
+typedef int (*bcast_message_t)(dmq_peer_t* peer, str* body, dmq_node_t* except,
+		dmq_resp_cback_t* resp_cback, int max_forwards);
+typedef int (*send_message_t)(dmq_peer_t* peer, str* body, dmq_node_t* node,
+		dmq_resp_cback_t* resp_cback, int max_forwards);
 
 typedef struct dmq_api {
 	register_dmq_peer_t register_dmq_peer;

+ 32 - 14
modules/dmq/dmq.c

@@ -21,10 +21,8 @@
  * along with this program; if not, write to the Free Software 
  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  *
- * History:
- * --------
- *  2010-03-29  initial version (mariusbucur)
  */
+
 #include <stdio.h>
 #include <string.h>
 #include <stdlib.h>
@@ -135,7 +133,8 @@ struct module_exports exports = {
 /**
  * init module function
  */
-static int mod_init(void) {
+static int mod_init(void)
+{
 	
 	if(register_mi_mod(exports.name, mi_cmds)!=0) {
 		LM_ERR("failed to register MI commands\n");
@@ -153,12 +152,20 @@ static int mod_init(void) {
 		LM_ERR("can't load tm functions. TM module probably not loaded\n");
 		return -1;
 	}
+
 	/* load peer list - the list containing the module callbacks for dmq */
-	
 	peer_list = init_peer_list();
+	if(peer_list==NULL) {
+		LM_ERR("cannot initialize peer list\n");
+		return -1;
+	}
 	
 	/* load the dmq node list - the list containing the dmq servers */
 	node_list = init_dmq_node_list();
+	if(node_list==NULL) {
+		LM_ERR("cannot initialize node list\n");
+		return -1;
+	}
 	
 	/* register worker processes - add one because of the ping process */
 	register_procs(num_workers);
@@ -169,7 +176,8 @@ static int mod_init(void) {
 		return -1;
 	}
 	
-	if(parse_server_address(&dmq_notification_address, &dmq_notification_uri) < 0) {
+	if(parse_server_address(&dmq_notification_address,
+				&dmq_notification_uri) < 0) {
 		LM_ERR("notification address invalid\n");
 		return -1;
 	}
@@ -182,10 +190,13 @@ static int mod_init(void) {
 	}
 	
 	/**
-         * add the dmq notification peer.
+	 * add the dmq notification peer.
 	 * the dmq is a peer itself so that it can receive node notifications
 	 */
-	add_notification_peer();
+	if(add_notification_peer()<0) {
+		LM_ERR("cannot add notification peer\n");
+		return -1;
+	}
 	
 	startup_time = (int) time(NULL);
 	
@@ -196,7 +207,10 @@ static int mod_init(void) {
 	if(ping_interval < MIN_PING_INTERVAL) {
 		ping_interval = MIN_PING_INTERVAL;
 	}
-	register_timer(ping_servers, 0, ping_interval);
+	if(register_timer(ping_servers, 0, ping_interval)<0) {
+		LM_ERR("cannot register timer callback\n");
+		return -1;
+	}
 	
 	return 0;
 }
@@ -204,7 +218,8 @@ static int mod_init(void) {
 /**
  * initialize children
  */
-static int child_init(int rank) {
+static int child_init(int rank)
+{
   	int i, newpid;
 	if (rank == PROC_MAIN) {
 		/* fork worker processes */
@@ -216,7 +231,7 @@ static int child_init(int rank) {
 				LM_ERR("failed to form process\n");
 				return -1;
 			} else if(newpid == 0) {
-				// child - this will loop forever
+				/* child - this will loop forever */
 				worker_loop(i);
 			} else {
 				workers[i].pid = newpid;
@@ -259,15 +274,18 @@ static void destroy(void) {
 	}
 }
 
-static int handle_dmq_fixup(void** param, int param_no) {
+static int handle_dmq_fixup(void** param, int param_no)
+{
  	return 0;
 }
 
-static int send_dmq_fixup(void** param, int param_no) {
+static int send_dmq_fixup(void** param, int param_no)
+{
 	return fixup_spve_null(param, 1);
 }
 
-static int parse_server_address(str* uri, struct sip_uri* parsed_uri) {
+static int parse_server_address(str* uri, struct sip_uri* parsed_uri)
+{
 	if(!uri->s) {
 		goto empty;
 	}

+ 27 - 2
modules/dmq/dmq.h

@@ -1,5 +1,30 @@
-#ifndef DMQ_H
-#define DMQ_H
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+
+#ifndef _DMQ_H_
+#define _DMQ_H_
 
 #include "../../dprint.h"
 #include "../../error.h"

+ 104 - 24
modules/dmq/dmq_funcs.c

@@ -1,7 +1,35 @@
+/*
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
 #include "dmq_funcs.h"
 #include "notification_peer.h"
 
-dmq_peer_t* register_dmq_peer(dmq_peer_t* peer) {
+/**
+ * @brief register a DMQ peer
+ */
+dmq_peer_t* register_dmq_peer(dmq_peer_t* peer)
+{
 	dmq_peer_t* new_peer;
 	lock_get(&peer_list->lock);
 	if(search_peer_list(peer_list, peer)) {
@@ -15,8 +43,18 @@ dmq_peer_t* register_dmq_peer(dmq_peer_t* peer) {
 	return new_peer;
 }
 
-void dmq_tm_callback(struct cell *t, int type, struct tmcb_params *ps) {
-	dmq_cback_param_t* cb_param = (dmq_cback_param_t*)(*ps->param);
+/**
+ * @brief dmq tm callback
+ */
+void dmq_tm_callback(struct cell *t, int type, struct tmcb_params *ps)
+{
+	dmq_cback_param_t* cb_param;
+	
+	cb_param = (dmq_cback_param_t*)(*ps->param);
+
+	if(cb_param==NULL)
+		return;
+
 	LM_DBG("dmq_tm_callback start\n");
 	if(cb_param->resp_cback.f) {
 		if(cb_param->resp_cback.f(ps->rpl, ps->code, cb_param->node, cb_param->resp_cback.param) < 0) {
@@ -26,11 +64,15 @@ void dmq_tm_callback(struct cell *t, int type, struct tmcb_params *ps) {
 	LM_DBG("dmq_tm_callback done\n");
 	shm_free_node(cb_param->node);
 	shm_free(cb_param);
+	*ps->param = NULL;
 }
 
-int build_uri_str(str* username, struct sip_uri* uri, str* from) {
+int build_uri_str(str* username, struct sip_uri* uri, str* from)
+{
 	/* sip:user@host:port */
-	int from_len = username->len + uri->host.len + uri->port.len + 10;
+	int from_len;
+	
+	from_len = username->len + uri->host.len + uri->port.len + 10;
 	if(!uri->host.s || !uri->host.len) {
 		LM_ERR("no host in uri\n");
 		return -1;
@@ -40,6 +82,10 @@ int build_uri_str(str* username, struct sip_uri* uri, str* from) {
 		return -1;
 	}
 	from->s = pkg_malloc(from_len);
+	if(from->s==NULL) {
+		LM_ERR("no more pkg\n");
+		return -1;
+	}
 	from->len = 0;
 	
 	memcpy(from->s, "sip:", 4);
@@ -63,13 +109,17 @@ int build_uri_str(str* username, struct sip_uri* uri, str* from) {
 	return 0;
 }
 
-/* broadcast a dmq message
+/**
+ * @brief broadcast a dmq message
+ *
  * peer - the peer structure on behalf of which we are sending
  * body - the body of the message
  * except - we do not send the message to this node
  * resp_cback - a response callback that gets called when the transaction is complete
  */
-int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_cback_t* resp_cback, int max_forwards) {
+int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except,
+		dmq_resp_cback_t* resp_cback, int max_forwards)
+{
 	dmq_node_t* node;
 	
 	lock_get(&node_list->lock);
@@ -80,7 +130,8 @@ int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_
 		 *   - itself
 		 *   - any inactive nodes
 		 */
-		if((except && cmp_dmq_node(node, except)) || node->local || node->status != DMQ_NODE_ACTIVE) {
+		if((except && cmp_dmq_node(node, except)) || node->local
+				|| node->status != DMQ_NODE_ACTIVE) {
 			LM_DBG("skipping node %.*s\n", STR_FMT(&node->orig_uri));
 			node = node->next;
 			continue;
@@ -98,13 +149,17 @@ error:
 	return -1;
 }
 
-/* send a dmq message
+/**
+ * @brief send a dmq message
+ *
  * peer - the peer structure on behalf of which we are sending
  * body - the body of the message
  * node - we send the message to this node
  * resp_cback - a response callback that gets called when the transaction is complete
  */
-int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback, int max_forwards) {
+int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node,
+		dmq_resp_cback_t* resp_cback, int max_forwards)
+{
 	uac_req_t uac_r;
 	str str_hdr = {0, 0};
 	str from, to, req_uri;
@@ -113,8 +168,12 @@ int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cba
 	int len = 0;
 	
 	/* Max-Forwards */
-	str_hdr.len = 18 + CRLF_LEN;
+	str_hdr.len = 20 + CRLF_LEN;
 	str_hdr.s = pkg_malloc(str_hdr.len);
+	if(str_hdr.s==NULL) {
+		LM_ERR("no more pkg\n");
+		return -1;
+	}
 	len += sprintf(str_hdr.s, "Max-Forwards: %d%s", max_forwards, CRLF);
 	str_hdr.len = len;
 	
@@ -124,7 +183,8 @@ int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cba
 	cb_param->node = shm_dup_node(node);
 	
 	if(build_uri_str(&peer->peer_id, &dmq_server_uri, &from) < 0) {
-		LM_ERR("error building from string [username %.*s]\n", STR_FMT(&peer->peer_id));
+		LM_ERR("error building from string [username %.*s]\n",
+				STR_FMT(&peer->peer_id));
 		goto error;
 	}
 	if(build_uri_str(&peer->peer_id, &node->uri, &to) < 0) {
@@ -133,8 +193,8 @@ int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cba
 	}
 	req_uri = to;
 	
-	set_uac_req(&uac_r, &dmq_request_method, &str_hdr, body, NULL, TMCB_LOCAL_COMPLETED,
-			dmq_tm_callback, (void*)cb_param);
+	set_uac_req(&uac_r, &dmq_request_method, &str_hdr, body, NULL,
+			TMCB_LOCAL_COMPLETED, dmq_tm_callback, (void*)cb_param);
 	result = tmb.t_request(&uac_r, &req_uri,
 			       &to, &from,
 			       NULL);
@@ -149,13 +209,28 @@ error:
 	return -1;
 }
 
-int cfg_dmq_send_message(struct sip_msg* msg, char* peer, char* to, char* body) {
+/**
+ * @brief config file function for sending dmq message
+ */
+int cfg_dmq_send_message(struct sip_msg* msg, char* peer, char* to, char* body)
+{
 	str peer_str;
-	get_str_fparam(&peer_str, msg, (fparam_t*)peer);
 	str to_str;
-	get_str_fparam(&to_str, msg, (fparam_t*)to);
 	str body_str;
-	get_str_fparam(&body_str, msg, (fparam_t*)body);
+	
+	if(get_str_fparam(&peer_str, msg, (fparam_t*)peer)<0) {
+		LM_ERR("cannot get peer value\n");
+		return -1;
+	}
+	if(get_str_fparam(&to_str, msg, (fparam_t*)to)<0) {
+		LM_ERR("cannot get dst value\n");
+		return -1;
+	}
+	if(get_str_fparam(&body_str, msg, (fparam_t*)body)<0) {
+		LM_ERR("cannot get body value\n");
+		return -1;
+	}
+	
 	LM_INFO("cfg_dmq_send_message: %.*s - %.*s - %.*s\n",
 		peer_str.len, peer_str.s,
 		to_str.len, to_str.s,
@@ -180,27 +255,32 @@ int cfg_dmq_send_message(struct sip_msg* msg, char* peer, char* to, char* body)
 		LM_ERR("cannot find dmq_node: %.*s\n", to_str.len, to_str.s);
 		goto error;
 	}
-	if(dmq_send_message(destination_peer, &body_str, to_dmq_node, &notification_callback, 1) < 0) {
+	if(dmq_send_message(destination_peer, &body_str, to_dmq_node,
+				&notification_callback, 1) < 0) {
 		LM_ERR("cannot send dmq message\n");
 		goto error;
 	}
-	return 0;
+	return 1;
 error:
 	return -1;
 }
 
-/* pings the servers in the nodelist
+/**
+ * @brief pings the servers in the nodelist
+ *
  * if the server does not reply to the ping, it is removed from the list
  * the ping messages are actualy notification requests
  * this way the ping will have two uses:
  *   - checks if the servers in the list are up and running
  *   - updates the list of servers from the other nodes
  */
-void ping_servers(unsigned int ticks,void *param) {
-	str* body = build_notification_body();
+void ping_servers(unsigned int ticks, void *param) {
+	str* body;
 	int ret;
 	LM_DBG("ping_servers\n");
-	ret = bcast_dmq_message(dmq_notification_peer, body, notification_node, &notification_callback, 1);
+	body = build_notification_body();
+	ret = bcast_dmq_message(dmq_notification_peer, body, notification_node,
+			&notification_callback, 1);
 	pkg_free(body->s);
 	pkg_free(body);
 	if(ret < 0) {

+ 33 - 5
modules/dmq/dmq_funcs.h

@@ -1,5 +1,30 @@
-#ifndef DMQ_FUNCS_H
-#define DMQ_FUNCS_H
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+
+#ifndef _DMQ_FUNCS_H_
+#define _DMQ_FUNCS_H_
 
 #include "../../str.h"
 #include "../../modules/tm/dlg.h"
@@ -21,10 +46,13 @@ typedef struct dmq_cback_param {
 	dmq_node_t* node;
 } dmq_cback_param_t;
 
-int cfg_dmq_send_message(struct sip_msg* msg, char* peer, char* to, char* body);
+int cfg_dmq_send_message(struct sip_msg* msg, char* peer, char* to,
+		char* body);
 dmq_peer_t* register_dmq_peer(dmq_peer_t* peer);
-int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback, int max_forwards);
-int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_cback_t* resp_cback, int max_forwards);
+int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node,
+		dmq_resp_cback_t* resp_cback, int max_forwards);
+int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except,
+		dmq_resp_cback_t* resp_cback, int max_forwards);
 
 #endif
 

+ 157 - 28
modules/dmq/dmqnode.c

@@ -1,3 +1,28 @@
+/*
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
 #include "../../ut.h"
 #include "dmqnode.h"
 #include "dmq.h"
@@ -12,7 +37,11 @@ str dmq_node_active_str = str_init("active");
 str dmq_node_disabled_str = str_init("disabled");
 str dmq_node_timeout_str = str_init("timeout");
 
-str* get_status_str(int status) {
+/**
+ * @brief get the string status of the node
+ */
+str* get_status_str(int status)
+{
 	switch(status) {
 		case DMQ_NODE_ACTIVE: {
 			return &dmq_node_active_str;
@@ -29,14 +58,27 @@ str* get_status_str(int status) {
 	}
 }
 
-dmq_node_list_t* init_dmq_node_list() {
-	dmq_node_list_t* node_list = shm_malloc(sizeof(dmq_node_list_t));
+/**
+ * @brief initialize dmg node list
+ */
+dmq_node_list_t* init_dmq_node_list()
+{
+	dmq_node_list_t* node_list;
+	node_list = shm_malloc(sizeof(dmq_node_list_t));
+	if(node_list==NULL) {
+		LM_ERR("no more shm\n");
+		return NULL;
+	}
 	memset(node_list, 0, sizeof(dmq_node_list_t));
 	lock_init(&node_list->lock);
 	return node_list;
 }
 
-int cmp_dmq_node(dmq_node_t* node, dmq_node_t* cmpnode) {
+/**
+ * @brief compare dmq node addresses
+ */
+int cmp_dmq_node(dmq_node_t* node, dmq_node_t* cmpnode)
+{
 	if(!node || !cmpnode) {
 		LM_ERR("cmp_dmq_node - null node received\n");
 		return -1;
@@ -45,7 +87,11 @@ int cmp_dmq_node(dmq_node_t* node, dmq_node_t* cmpnode) {
 	       STR_EQ(node->uri.port, cmpnode->uri.port);
 }
 
-str* get_param_value(param_t* params, str* param) {
+/**
+ * @brief get the value of a parameter
+ */
+str* get_param_value(param_t* params, str* param)
+{
 	while (params) {
 		if ((params->name.len == param->len) &&
 		    (strncmp(params->name.s, param->s, param->len) == 0)) {
@@ -56,7 +102,11 @@ str* get_param_value(param_t* params, str* param) {
 	return NULL;
 }
 
-int set_dmq_node_params(dmq_node_t* node, param_t* params) {
+/**
+ * @brief set the parameters for the node
+ */
+int set_dmq_node_params(dmq_node_t* node, param_t* params)
+{
 	str* status;
 	if(!params) {
 		LM_DBG("no parameters given\n");
@@ -80,26 +130,45 @@ error:
 	return -1;
 }
 
-int set_default_dmq_node_params(dmq_node_t* node) {
+/**
+ * @brief set default node params
+ */
+int set_default_dmq_node_params(dmq_node_t* node)
+{
 	node->status = DMQ_NODE_ACTIVE;
 	return 0;
 }
 
+/**
+ * @brief build a dmq node
+ */
 dmq_node_t* build_dmq_node(str* uri, int shm) {
-	dmq_node_t* ret;
+	dmq_node_t* ret = NULL;
 	param_hooks_t hooks;
 	param_t* params;
 	
 	LM_DBG("build_dmq_node %.*s with %s memory\n", STR_FMT(uri), shm?"shm":"private");
 	
 	if(shm) {
-		ret = shm_malloc(sizeof(*ret));
-		memset(ret, 0, sizeof(*ret));
-		shm_str_dup(&ret->orig_uri, uri);
+		ret = shm_malloc(sizeof(dmq_node_t));
+		if(ret==NULL) {
+			LM_ERR("no more shm\n");
+			goto error;
+		}
+		memset(ret, 0, sizeof(dmq_node_t));
+		if(shm_str_dup(&ret->orig_uri, uri)<0) {
+			goto error;
+		}
 	} else {
-		ret = pkg_malloc(sizeof(*ret));
-		memset(ret, 0, sizeof(*ret));
-		pkg_str_dup(&ret->orig_uri, uri);
+		ret = pkg_malloc(sizeof(dmq_node_t));
+		if(ret==NULL) {
+			LM_ERR("no more pkg\n");
+			goto error;
+		}
+		memset(ret, 0, sizeof(dmq_node_t));
+		if(pkg_str_dup(&ret->orig_uri, uri)<0) {
+			goto error;
+		}
 	}
 	set_default_dmq_node_params(ret);
 	if(parse_uri(ret->orig_uri.s, ret->orig_uri.len, &ret->uri) < 0) {
@@ -131,19 +200,39 @@ dmq_node_t* build_dmq_node(str* uri, int shm) {
 		LM_DBG("no dmqnode params found\n");		
 	}
 	return ret;
+
 error:
+	if(ret!=NULL) {
+		/* tbd: free uri and params */
+		if(shm) {
+			shm_free(ret);
+		} else {
+			pkg_free(ret);
+		}
+	}
 	return NULL;
 }
 
-dmq_node_t* find_dmq_node_uri(dmq_node_list_t* list, str* uri) {
+/**
+ * @brief find dmq node by uri
+ */
+dmq_node_t* find_dmq_node_uri(dmq_node_list_t* list, str* uri)
+{
 	dmq_node_t *ret, *find;
 	find =  build_dmq_node(uri, 0);
+	if(find==NULL)
+		return NULL;
 	ret = find_dmq_node(list, find);
 	destroy_dmq_node(find, 0);
 	return ret;
 }
 
-void destroy_dmq_node(dmq_node_t* node, int shm) {
+/**
+ * @brief destroy dmq node
+ */
+void destroy_dmq_node(dmq_node_t* node, int shm)
+{
+	/* tbd: check inner fields */
 	if(shm) {
 		shm_free_node(node);
 	} else {
@@ -151,7 +240,11 @@ void destroy_dmq_node(dmq_node_t* node, int shm) {
 	}
 }
 
-dmq_node_t* find_dmq_node(dmq_node_list_t* list, dmq_node_t* node) {
+/**
+ * @brief find dmq node
+ */
+dmq_node_t* find_dmq_node(dmq_node_list_t* list, dmq_node_t* node)
+{
 	dmq_node_t* cur = list->nodes;
 	while(cur) {
 		if(cmp_dmq_node(cur, node)) {
@@ -162,32 +255,58 @@ dmq_node_t* find_dmq_node(dmq_node_list_t* list, dmq_node_t* node) {
 	return NULL;
 }
 
-dmq_node_t* shm_dup_node(dmq_node_t* node) {
-	dmq_node_t* newnode = shm_malloc(sizeof(dmq_node_t));
+/**
+ * @brief duplicate dmq node
+ */
+dmq_node_t* shm_dup_node(dmq_node_t* node)
+{
+	dmq_node_t* newnode;
+	newnode = shm_malloc(sizeof(dmq_node_t));
+	if(newnode==NULL) {
+		LM_ERR("no more shm\n");
+		return NULL;
+	}
 	memcpy(newnode, node, sizeof(dmq_node_t));
-	shm_str_dup(&newnode->orig_uri, &node->orig_uri);
-	if(parse_uri(newnode->orig_uri.s, newnode->orig_uri.len, &newnode->uri) < 0) {
+	newnode->orig_uri.s = NULL;
+	if(shm_str_dup(&newnode->orig_uri, &node->orig_uri)<0) {
+		goto error;
+	}
+	if(parse_uri(newnode->orig_uri.s, newnode->orig_uri.len,
+				&newnode->uri) < 0) {
 		LM_ERR("error in parsing node uri\n");
 		goto error;
 	}
 	return newnode;
 error:
-	shm_free(newnode->orig_uri.s);
+	if(newnode->orig_uri.s!=NULL)
+		shm_free(newnode->orig_uri.s);
 	shm_free(newnode);
 	return NULL;
 }
 
-void shm_free_node(dmq_node_t* node) {
+/**
+ * @brief free shm dmq node
+ */
+void shm_free_node(dmq_node_t* node)
+{
 	shm_free(node->orig_uri.s);
 	shm_free(node);
 }
 
-void pkg_free_node(dmq_node_t* node) {
+/**
+ * @brief free pkg dmq node
+ */
+void pkg_free_node(dmq_node_t* node)
+{
 	pkg_free(node->orig_uri.s);
 	pkg_free(node);
 }
 
-int del_dmq_node(dmq_node_list_t* list, dmq_node_t* node) {
+/**
+ * @brief delete dmq node
+ */
+int del_dmq_node(dmq_node_list_t* list, dmq_node_t* node)
+{
 	dmq_node_t *cur, **prev;
 	lock_get(&list->lock);
 	cur = list->nodes;
@@ -206,8 +325,14 @@ int del_dmq_node(dmq_node_list_t* list, dmq_node_t* node) {
 	return 0;
 }
 
-dmq_node_t* add_dmq_node(dmq_node_list_t* list, str* uri) {
-	dmq_node_t* newnode = build_dmq_node(uri, 1);
+/**
+ * @brief add dmq node
+ */
+dmq_node_t* add_dmq_node(dmq_node_list_t* list, str* uri)
+{
+	dmq_node_t* newnode;
+	
+	newnode = build_dmq_node(uri, 1);
 	if(!newnode) {
 		LM_ERR("error creating node\n");
 		goto error;
@@ -223,6 +348,9 @@ error:
 	return NULL;
 }
 
+/**
+ * @brief build dmq node string
+ */
 int build_node_str(dmq_node_t* node, char* buf, int buflen) {
 	/* sip:host:port;status=[status] */
 	int len = 0;
@@ -242,7 +370,8 @@ int build_node_str(dmq_node_t* node, char* buf, int buflen) {
 	len += 1;
 	memcpy(buf + len, "status=", 7);
 	len += 7;
-	memcpy(buf + len, get_status_str(node->status)->s, get_status_str(node->status)->len);
+	memcpy(buf + len, get_status_str(node->status)->s,
+			get_status_str(node->status)->len);
 	len += get_status_str(node->status)->len;
 	return len;
 }

+ 27 - 2
modules/dmq/dmqnode.h

@@ -1,5 +1,30 @@
-#ifndef DMQNODE_H
-#define DMQNODE_H
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+
+#ifndef _DMQNODE_H_
+#define _DMQNODE_H_
 
 #include <string.h>
 #include <stdlib.h>

+ 35 - 2
modules/dmq/message.c

@@ -1,6 +1,32 @@
 #include "../../parser/parse_to.h"
 #include "../../parser/parse_uri.h"
 #include "../../sip_msg_clone.h"
+/*
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+
 #include "../../parser/parse_content.h"
 #include "../../parser/parse_from.h"
 #include "../../ut.h"
@@ -14,7 +40,11 @@ str dmq_400_rpl  = str_init("Bad request");
 str dmq_500_rpl  = str_init("Server Internal Error");
 str dmq_404_rpl  = str_init("User Not Found");
 
-int dmq_handle_message(struct sip_msg* msg, char* str1, char* str2) {
+/**
+ * @brief config function to handle dmq messages
+ */
+int dmq_handle_message(struct sip_msg* msg, char* str1, char* str2)
+{
 	dmq_peer_t* peer;
 	struct sip_msg* cloned_msg = NULL;
 	int cloned_msg_len;
@@ -43,7 +73,10 @@ int dmq_handle_message(struct sip_msg* msg, char* str1, char* str2) {
 		LM_ERR("error cloning sip message\n");
 		goto error;
 	}
-	add_dmq_job(cloned_msg, peer);
+	if(add_dmq_job(cloned_msg, peer)<0) {
+		LM_ERR("failed to add dmq job\n");
+		goto error;
+	}
 	return 0;
 error:
 	return -1;

+ 28 - 0
modules/dmq/message.h

@@ -1,3 +1,31 @@
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+
+#ifndef _MESSAGE_H_
+#define _MESSAGE_H_
 
 int dmq_handle_message(struct sip_msg*, char*, char*);
 
+#endif

+ 82 - 11
modules/dmq/notification_peer.c

@@ -1,9 +1,39 @@
+/*
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+
 #include "notification_peer.h"
 
 static str notification_content_type = str_init("text/plain");
 dmq_resp_cback_t notification_callback = {&notification_resp_callback_f, 0};
 
-int add_notification_peer() {
+/**
+ * @brief add notification peer
+ */
+int add_notification_peer()
+{
 	dmq_peer_t not_peer;
 	not_peer.callback = dmq_notification_callback;
 	not_peer.description.s = "notification_peer";
@@ -28,9 +58,15 @@ error:
 	return -1;
 }
 
-dmq_node_t* add_server_and_notify(str* server_address) {
+/**
+ * @brief add a server node and notify it
+ */
+dmq_node_t* add_server_and_notify(str* server_address)
+{
 	/* add the notification server to the node list - if any */
-	dmq_node_t* node = add_dmq_node(node_list, server_address);
+	dmq_node_t* node;
+	
+	node = add_dmq_node(node_list, server_address);
 	if(!node) {
 		LM_ERR("error adding notification node\n");
 		goto error;
@@ -58,7 +94,8 @@ error:
  * 	sip:host2:port2;param2=value2
  * 	...
  */
-int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg) {
+int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg)
+{
 	int content_length, total_nodes = 0;
 	str body;
 	str tmp_uri;
@@ -115,7 +152,11 @@ error:
 	return -1;
 }
 
-int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) {
+/**
+ * @brief dmq notification callback
+ */
+int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp)
+{
 	int nodes_recv;
 	str* response_body = NULL;
 	unsigned int maxforwards = 1;
@@ -137,6 +178,10 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) {
 	nodes_recv = extract_node_list(node_list, msg);
 	LM_DBG("received %d new nodes\n", nodes_recv);
 	response_body = build_notification_body();
+	if(response_body==NULL) {
+		LM_ERR("no response body\n");
+		goto error;
+	}
 	resp->content_type = notification_content_type;
 	resp->reason = dmq_200_rpl;
 	resp->body = *response_body;
@@ -145,7 +190,8 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) {
 	/* if we received any new nodes tell about them to the others */
 	if(nodes_recv > 0 && maxforwards > 0) {
 		/* maxforwards is set to 0 so that the message is will not be in a spiral */
-		bcast_dmq_message(dmq_notification_peer, response_body, 0, &notification_callback, maxforwards);
+		bcast_dmq_message(dmq_notification_peer, response_body, 0,
+				&notification_callback, maxforwards);
 	}
 	LM_DBG("broadcasted message\n");
 	pkg_free(response_body);
@@ -161,7 +207,8 @@ error:
  * sip:host2:port2;param2=value2\r\n
  * sip:host3:port3;param3=value3
  */
-str* build_notification_body() {
+str* build_notification_body()
+{
 	/* the length of the current line describing the server */
 	int slen;
 	/* the current length of the body */
@@ -169,10 +216,19 @@ str* build_notification_body() {
 	dmq_node_t* cur_node = NULL;
 	str* body;
 	body = pkg_malloc(sizeof(str));
+	if(body==NULL) {
+		LM_ERR("no more pkg\n");
+		return NULL;
+	}
 	memset(body, 0, sizeof(str));
 	/* we allocate a chunk of data for the body */
 	body->len = NBODY_LEN;
 	body->s = pkg_malloc(body->len);
+	if(body->s==NULL) {
+		LM_ERR("no more pkg\n");
+		pkg_free(body);
+		return NULL;
+	}
 	/* we add each server to the body - each on a different line */
 	lock_get(&node_list->lock);
 	cur_node = node_list->nodes;
@@ -198,16 +254,31 @@ error:
 	return NULL;
 }
 
-int request_nodelist(dmq_node_t* node, int forward) {
-	str* body = build_notification_body();
+/**
+ * @brief request node list
+ */
+int request_nodelist(dmq_node_t* node, int forward)
+{
+	str* body;
 	int ret;
-	ret = dmq_send_message(dmq_notification_peer, body, node, &notification_callback, forward);
+	body = build_notification_body();
+	if(body==NULL) {
+		LM_ERR("no notification body\n");
+		return -1;
+	}
+	ret = dmq_send_message(dmq_notification_peer, body, node,
+			&notification_callback, forward);
 	pkg_free(body->s);
 	pkg_free(body);
 	return ret;
 }
 
-int notification_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param) {
+/**
+ * @brief notification response callback
+ */
+int notification_resp_callback_f(struct sip_msg* msg, int code,
+		dmq_node_t* node, void* param)
+{
 	int ret;
 	LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
 	if(code == 408) {

+ 30 - 1
modules/dmq/notification_peer.h

@@ -1,3 +1,30 @@
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#ifndef _NOTIFICATION_PEER_H_
+#define _NOTIFICATION_PEER_H_
+
 #include "../../parser/msg_parser.h"
 #include "../../parser/parse_content.h"
 #include "../../ut.h"
@@ -22,6 +49,8 @@ int request_nodelist(dmq_node_t* node, int forward);
 dmq_node_t* add_server_and_notify(str* server_address);
 
 /* helper functions */
-extern int notification_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
+extern int notification_resp_callback_f(struct sip_msg* msg, int code,
+		dmq_node_t* node, void* param);
 extern dmq_resp_cback_t notification_callback;
 
+#endif

+ 85 - 13
modules/dmq/peer.c

@@ -1,49 +1,121 @@
+/*
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
 #include "peer.h"
 #include "dmq.h"
 
-dmq_peer_list_t* init_peer_list() {
-	dmq_peer_list_t* peer_list = shm_malloc(sizeof(dmq_peer_list_t));
+/**
+ * @brief init peer list
+ */
+dmq_peer_list_t* init_peer_list()
+{
+	dmq_peer_list_t* peer_list;
+	peer_list = shm_malloc(sizeof(dmq_peer_list_t));
+	if(peer_list==NULL) {
+		LM_ERR("no more shm\n");
+		return NULL;
+	}
 	memset(peer_list, 0, sizeof(dmq_peer_list_t));
 	lock_init(&peer_list->lock);
 	return peer_list;
 }
 
-dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
-	dmq_peer_t* cur = peer_list->peers;
+/**
+ * @brief search peer list
+ */
+dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer)
+{
+	dmq_peer_t* crt;
 	int len;
-	while(cur) {
+	crt = peer_list->peers;
+	while(crt) {
 		/* len - the minimum length of the two strings */
-		len = cur->peer_id.len < peer->peer_id.len ? cur->peer_id.len:peer->peer_id.len;
-		if(strncasecmp(cur->peer_id.s, peer->peer_id.s, len) == 0) {
-			return cur;
+		len = (crt->peer_id.len < peer->peer_id.len)
+			? crt->peer_id.len:peer->peer_id.len;
+		if(strncasecmp(crt->peer_id.s, peer->peer_id.s, len) == 0) {
+			return crt;
 		}
-		cur = cur->next;
+		crt = crt->next;
 	}
 	return 0;
 }
 
-dmq_peer_t* add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
-	dmq_peer_t* new_peer = shm_malloc(sizeof(dmq_peer_t));
+/**
+ * @brief add peer
+ */
+dmq_peer_t* add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer)
+{
+	dmq_peer_t* new_peer = NULL;
+	
+	new_peer = shm_malloc(sizeof(dmq_peer_t));
+	if(new_peer==NULL) {
+		LM_ERR("no more shm\n");
+		return NULL;
+	}
 	*new_peer = *peer;
 	
 	/* copy the str's */
 	new_peer->peer_id.s = shm_malloc(peer->peer_id.len);
+	if(new_peer->peer_id.s==NULL) {
+		LM_ERR("no more shm\n");
+		shm_free(new_peer);
+		return NULL;
+	}
 	memcpy(new_peer->peer_id.s, peer->peer_id.s, peer->peer_id.len);
+	new_peer->peer_id.len = peer->peer_id.len;
+
 	new_peer->description.s = shm_malloc(peer->description.len);
+	if(new_peer->description.s==NULL) {
+		LM_ERR("no more shm\n");
+		shm_free(new_peer->peer_id.s);
+		shm_free(new_peer);
+		return NULL;
+	}
 	memcpy(new_peer->peer_id.s, peer->peer_id.s, peer->peer_id.len);
+	new_peer->peer_id.len = peer->peer_id.len;
 	
 	new_peer->next = peer_list->peers;
 	peer_list->peers = new_peer;
 	return new_peer;
 }
 
-dmq_peer_t* find_peer(str peer_id) {
+/**
+ * @brief find peer by id
+ */
+dmq_peer_t* find_peer(str peer_id)
+{
 	dmq_peer_t foo_peer;
 	foo_peer.peer_id = peer_id;
 	return search_peer_list(peer_list, &foo_peer);
 }
 
-int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp) {
+/**
+ * @empty callback
+ */
+int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp)
+{
 	return 0;
 }
 

+ 27 - 2
modules/dmq/peer.h

@@ -1,5 +1,30 @@
-#ifndef PEER_H
-#define PEER_H
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+
+#ifndef _PEER_H_
+#define _PEER_H_
 
 #include <string.h>
 #include <stdlib.h>

+ 109 - 24
modules/dmq/worker.c

@@ -1,10 +1,37 @@
+/*
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
 #include "dmq.h"
 #include "peer.h"
 #include "worker.h"
 #include "../../data_lump_rpl.h"
 #include "../../mod_fix.h"
 
-/* set the body of a response */
+/**
+ * @brief set the body of a response
+ */
 static int set_reply_body(struct sip_msg* msg, str* body, str* content_type)
 {
 	char* buf;
@@ -41,16 +68,23 @@ static int set_reply_body(struct sip_msg* msg, str* body, str* content_type)
 	return 1;
 }
 
-void worker_loop(int id) {
-	dmq_worker_t* worker = &workers[id];
+/**
+ * @brief dmq worker loop
+ */
+void worker_loop(int id)
+{
+	dmq_worker_t* worker;
 	dmq_job_t* current_job;
 	peer_reponse_t peer_response;
 	int ret_value;
+
+	worker = &workers[id];
 	for(;;) {
 		LM_DBG("dmq_worker [%d %d] getting lock\n", id, my_pid());
 		lock_get(&worker->lock);
 		LM_DBG("dmq_worker [%d %d] lock acquired\n", id, my_pid());
-		/* multiple lock_release calls might be performed, so remove from queue until empty */
+		/* multiple lock_release calls might be performed, so remove
+		 * from queue until empty */
 		do {
 			/* fill the response with 0's */
 			memset(&peer_response, 0, sizeof(peer_response));
@@ -64,13 +98,15 @@ void worker_loop(int id) {
 				}
 				/* add the body to the reply */
 				if(peer_response.body.s) {
-					if(set_reply_body(current_job->msg, &peer_response.body, &peer_response.content_type) < 0) {
+					if(set_reply_body(current_job->msg, &peer_response.body,
+								&peer_response.content_type) < 0) {
 						LM_ERR("error adding lumps\n");
 						continue;
 					}
 				}
 				/* send the reply */
-				if(slb.freply(current_job->msg, peer_response.resp_code, &peer_response.reason) < 0)
+				if(slb.freply(current_job->msg, peer_response.resp_code,
+							&peer_response.reason) < 0)
 				{
 					LM_ERR("error sending reply\n");
 				}
@@ -89,10 +125,17 @@ void worker_loop(int id) {
 	}
 }
 
-int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer) {
+/**
+ * @brief add a dmq job
+ */
+int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer)
+{
 	int i, found_available = 0;
+	int ret;
 	dmq_job_t new_job = { 0 };
 	dmq_worker_t* worker;
+
+	ret = 0;
 	new_job.f = peer->callback;
 	new_job.msg = msg;
 	new_job.orig_peer = peer;
@@ -102,26 +145,33 @@ int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer) {
 	}
 	/* initialize the worker with the first one */
 	worker = workers;
-	/* search for an available worker, or, if not possible, for the least busy one */
+	/* search for an available worker, or, if not possible,
+	 * for the least busy one */
 	for(i = 0; i < num_workers; i++) {
 		if(job_queue_size(workers[i].queue) == 0) {
 			worker = &workers[i];
 			found_available = 1;
 			break;
-		} else if(job_queue_size(workers[i].queue) < job_queue_size(worker->queue)) {
+		} else if(job_queue_size(workers[i].queue)
+				< job_queue_size(worker->queue)) {
 			worker = &workers[i];
 		}
 	}
 	if(!found_available) {
-		LM_DBG("no available worker found, passing job to the least busy one [%d %d]\n",
-		       worker->pid, job_queue_size(worker->queue));
+		LM_DBG("no available worker found, passing job"
+				" to the least busy one [%d %d]\n",
+				worker->pid, job_queue_size(worker->queue));
 	}
-	job_queue_push(worker->queue, &new_job);
+	ret = job_queue_push(worker->queue, &new_job);
 	lock_release(&worker->lock);
-	return 0;
+	return ret;
 }
 
-void init_worker(dmq_worker_t* worker) {
+/**
+ * @brief init dmq worker
+ */
+void init_worker(dmq_worker_t* worker)
+{
 	memset(worker, 0, sizeof(*worker));
 	lock_init(&worker->lock);
 	// acquire the lock for the first time - so that dmq_worker_loop blocks
@@ -129,26 +179,55 @@ void init_worker(dmq_worker_t* worker) {
 	worker->queue = alloc_job_queue();
 }
 
-job_queue_t* alloc_job_queue() {
-	job_queue_t* queue = shm_malloc(sizeof(job_queue_t));
+/**
+ * @brief allog dmq job queue
+ */
+job_queue_t* alloc_job_queue()
+{
+	job_queue_t* queue;
+	
+	queue = shm_malloc(sizeof(job_queue_t));
+	if(queue==NULL) {
+		LM_ERR("no more shm\n");
+		return NULL;
+	}
+	memset(queue, 0, sizeof(job_queue_t));
 	atomic_set(&queue->count, 0);
-	queue->front = NULL;
-	queue->back = NULL;
 	lock_init(&queue->lock);
 	return queue;
 }
 
-void destroy_job_queue(job_queue_t* queue) {
-	shm_free(queue);
+/**
+ * @ brief destroy job queue
+ */
+void destroy_job_queue(job_queue_t* queue)
+{
+	if(queue!=NULL)
+		shm_free(queue);
 }
 
-int job_queue_size(job_queue_t* queue) {
+/**
+ * @brief return job queue size
+ */
+int job_queue_size(job_queue_t* queue)
+{
 	return atomic_get(&queue->count);
 }
 
-void job_queue_push(job_queue_t* queue, dmq_job_t* job) {
+/**
+ * @brief push to job queue
+ */
+int job_queue_push(job_queue_t* queue, dmq_job_t* job)
+{
 	/* we need to copy the dmq_job into a newly created dmq_job in shm */
-	dmq_job_t* newjob = shm_malloc(sizeof(dmq_job_t));
+	dmq_job_t* newjob;
+	
+	newjob = shm_malloc(sizeof(dmq_job_t));
+	if(newjob==NULL) {
+		LM_ERR("no more shm\n");
+		return -1;
+	}
+
 	*newjob = *job;
 	
 	lock_get(&queue->lock);
@@ -163,8 +242,14 @@ void job_queue_push(job_queue_t* queue, dmq_job_t* job) {
 	}
 	atomic_inc(&queue->count);
 	lock_release(&queue->lock);
+	return 0;
 }
-dmq_job_t* job_queue_pop(job_queue_t* queue) {
+
+/**
+ * @brief pop from job queue
+ */
+dmq_job_t* job_queue_pop(job_queue_t* queue)
+{
 	dmq_job_t* front;
 	lock_get(&queue->lock);
 	if(!queue->front) {

+ 27 - 3
modules/dmq/worker.h

@@ -1,5 +1,29 @@
-#ifndef DMQ_WORKER_H
-#define DMQ_WORKER_H
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#ifndef _DMQ_WORKER_H_
+#define _DMQ_WORKER_H_
 
 #include "peer.h"
 #include "../../locking.h"
@@ -36,7 +60,7 @@ void worker_loop(int id);
 
 job_queue_t* alloc_job_queue();
 void destroy_job_queue(job_queue_t* queue);
-void job_queue_push(job_queue_t* queue, dmq_job_t* job);
+int job_queue_push(job_queue_t* queue, dmq_job_t* job);
 dmq_job_t* job_queue_pop(job_queue_t* queue);
 int job_queue_size(job_queue_t* queue);