浏览代码

added support for adding parameters to dmqnodes

Marius Bucur 14 年之前
父节点
当前提交
841b6c1fb0

+ 3 - 0
modules_k/dmq/dmq.c

@@ -192,6 +192,9 @@ static int mod_init(void) {
 	 * add the ping timer
 	 * it pings the servers once in a while so that we know which failed
 	 */
+	if(ping_interval < MIN_PING_INTERVAL) {
+		ping_interval = MIN_PING_INTERVAL;
+	}
 	register_timer(ping_servers, 0, ping_interval);
 	
 	return 0;

+ 1 - 0
modules_k/dmq/dmq.h

@@ -13,6 +13,7 @@
 #include "dmqnode.h"
 
 #define DEFAULT_NUM_WORKERS	2
+#define MIN_PING_INTERVAL	4
 
 extern int num_workers;
 extern dmq_worker_t* workers;

+ 24 - 8
modules_k/dmq/dmq_funcs.c

@@ -68,17 +68,22 @@ int build_uri_str(str* username, struct sip_uri* uri, str* from) {
  * except - we do not send the message to this node
  * resp_cback - a response callback that gets called when the transaction is complete
  */
-int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_cback_t* resp_cback) {
+int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_cback_t* resp_cback, int max_forwards) {
 	dmq_node_t* node;
 	
 	lock_get(&node_list->lock);
 	node = node_list->nodes;
 	while(node) {
-		if((except && cmp_dmq_node(node, except)) || node->local) {
+		/* we do not send the message to the following:
+		 *   - the except node
+		 *   - itself
+		 *   - any inactive nodes
+		 */
+		if((except && cmp_dmq_node(node, except)) || node->local || node->status != DMQ_NODE_ACTIVE) {
 			node = node->next;
 			continue;
 		}
-		if(send_dmq_message(peer, body, node, resp_cback) < 0) {
+		if(send_dmq_message(peer, body, node, resp_cback, max_forwards) < 0) {
 			LM_ERR("error sending dmq message\n");
 			goto error;
 		}
@@ -97,12 +102,19 @@ error:
  * node - we send the message to this node
  * resp_cback - a response callback that gets called when the transaction is complete
  */
-int send_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback) {
+int send_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback, int max_forwards) {
 	uac_req_t uac_r;
 	str str_hdr = {0, 0};
 	str from, to, req_uri;
 	dmq_cback_param_t* cb_param = NULL;
 	int result = 0;
+	int len = 0;
+	
+	/* Max-Forwards */
+	str_hdr.len = 18 + CRLF_LEN;
+	str_hdr.s = pkg_malloc(str_hdr.len);
+	len += sprintf(str_hdr.s, "Max-Forwards: %d%s", max_forwards, CRLF);
+	str_hdr.len = len;
 	
 	cb_param = shm_malloc(sizeof(*cb_param));
 	memset(cb_param, 0, sizeof(*cb_param));
@@ -111,11 +123,11 @@ int send_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cba
 	
 	if(build_uri_str(&peer->peer_id, &dmq_server_uri, &from) < 0) {
 		LM_ERR("error building from string [username %.*s]\n", STR_FMT(&peer->peer_id));
-		return -1;
+		goto error;
 	}
 	if(build_uri_str(&peer->peer_id, &node->uri, &to) < 0) {
 		LM_ERR("error building to string\n");
-		return -1;
+		goto error;
 	}
 	req_uri = to;
 	
@@ -126,9 +138,13 @@ int send_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cba
 			       NULL);
 	if(result < 0) {
 		LM_ERR("error in tmb.t_request_within\n");
-		return -1;
+		goto error;
 	}
+	pkg_free(str_hdr.s);
 	return 0;
+error:
+	pkg_free(str_hdr.s);
+	return -1;
 }
 
 /* pings the servers in the nodelist
@@ -142,7 +158,7 @@ void ping_servers(unsigned int ticks,void *param) {
 	str* body = build_notification_body();
 	int ret;
 	LM_DBG("ping_servers\n");
-	ret = bcast_dmq_message(dmq_notification_peer, body, notification_node, &notification_callback);
+	ret = bcast_dmq_message(dmq_notification_peer, body, notification_node, &notification_callback, 0);
 	pkg_free(body->s);
 	pkg_free(body);
 	if(ret < 0) {

+ 3 - 2
modules_k/dmq/dmq_funcs.h

@@ -4,6 +4,7 @@
 #include "../../str.h"
 #include "../../modules/tm/dlg.h"
 #include "../../modules/tm/tm_load.h"
+#include "../../config.h"
 #include "dmq.h"
 #include "peer.h"
 #include "worker.h"
@@ -21,7 +22,7 @@ typedef struct dmq_cback_param {
 } dmq_cback_param_t;
 
 dmq_peer_t* register_dmq_peer(dmq_peer_t* peer);
-int send_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback);
-int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_cback_t* resp_cback);
+int send_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback, int max_forwards);
+int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_cback_t* resp_cback, int max_forwards);
 
 #endif

+ 117 - 8
modules_k/dmq/dmqnode.c

@@ -3,7 +3,14 @@
 #include "dmq.h"
 
 dmq_node_t* self_node;
-dmq_node_t* notification_node;	
+dmq_node_t* notification_node;
+
+/* name */
+str dmq_node_status_str = str_init("status");
+/* possible values */
+str dmq_node_active_str = str_init("active");
+str dmq_node_disabled_str = str_init("disabled");
+str dmq_node_timeout_str = str_init("timeout");
 
 dmq_node_list_t* init_dmq_node_list() {
 	dmq_node_list_t* node_list = shm_malloc(sizeof(dmq_node_list_t));
@@ -21,6 +28,106 @@ inline int cmp_dmq_node(dmq_node_t* node, dmq_node_t* cmpnode) {
 	       STR_EQ(node->uri.port, cmpnode->uri.port);
 }
 
+static str* get_param_value(param_t* params, str* param) {
+	while (params) {
+		if ((params->name.len == param->len) &&
+		    (strncmp(params->name.s, param->s, param->len) == 0)) {
+			return &params->body;
+		}
+		params = params->next;
+	}
+	return NULL;
+}
+
+inline int set_dmq_node_params(dmq_node_t* node, param_t* params) {
+	str* status;
+	if(!params) {
+		LM_DBG("no parameters given\n");
+		return 0;
+	}
+	status = get_param_value(params, &dmq_node_status_str);
+	if(status) {
+		if(str_strcmp(status, &dmq_node_active_str)) {
+			node->status = DMQ_NODE_ACTIVE;
+		} else if(str_strcmp(status, &dmq_node_timeout_str)) {
+			node->status = DMQ_NODE_ACTIVE;
+		} else if(str_strcmp(status, &dmq_node_disabled_str)) {
+			node->status = DMQ_NODE_ACTIVE;
+		} else {
+			LM_ERR("invalid status parameter: %.*s\n", STR_FMT(status));
+			goto error;
+		}
+	}
+	return 0;
+error:
+	return -1;
+}
+
+inline dmq_node_t* build_dmq_node(str* uri, int shm) {
+	dmq_node_t* ret;
+	param_hooks_t hooks;
+	param_t* params;
+	
+	LM_DBG("build_dmq_node %.*s with %s memory\n", STR_FMT(uri), shm?"shm":"private");
+	
+	if(shm) {
+		ret = shm_malloc(sizeof(*ret));
+		memset(ret, 0, sizeof(*ret));
+		shm_str_dup(&ret->orig_uri, uri);
+	} else {
+		ret = pkg_malloc(sizeof(*ret));
+		memset(ret, 0, sizeof(*ret));
+		pkg_str_dup(&ret->orig_uri, uri);
+	}
+	if(parse_uri(ret->orig_uri.s, ret->orig_uri.len, &ret->uri) < 0) {
+		LM_ERR("error parsing uri\n");
+		goto error;
+	}
+	/* if any parameters found, parse them */
+	if(parse_params(&ret->uri.params, CLASS_ANY, &hooks, &params) < 0) {
+		LM_ERR("error parsing params\n");
+		goto error;
+	}
+	/* if any params found */
+	if(params) {
+		if(shm) {
+			if(shm_duplicate_params(&ret->params, params) < 0) {
+				LM_ERR("error duplicating params\n");
+				free_params(params);
+				goto error;
+			}
+			free_params(params);
+		} else {
+			ret->params = params;
+		}
+		if(set_dmq_node_params(ret, ret->params) < 0) {
+			LM_ERR("error setting parameters\n");
+			goto error;
+		}
+	} else {
+		LM_DBG("no dmqnode params found\n");		
+	}
+	return ret;
+error:
+	return NULL;
+}
+
+inline dmq_node_t* find_dmq_node_uri(dmq_node_list_t* list, str* uri) {
+	dmq_node_t *ret, *find;
+	find =  build_dmq_node(uri, 0);
+	ret = find_dmq_node(list, find);
+	destroy_dmq_node(find, 0);
+	return ret;
+}
+
+inline void destroy_dmq_node(dmq_node_t* node, int shm) {
+	if(shm) {
+		shm_free_node(node);
+	} else {
+		pkg_free_node(node);
+	}
+}
+
 inline dmq_node_t* find_dmq_node(dmq_node_list_t* list, dmq_node_t* node) {
 	dmq_node_t* cur = list->nodes;
 	while(cur) {
@@ -52,6 +159,11 @@ inline void shm_free_node(dmq_node_t* node) {
 	shm_free(node);
 }
 
+inline void pkg_free_node(dmq_node_t* node) {
+	pkg_free(node->orig_uri.s);
+	pkg_free(node);
+}
+
 inline int del_dmq_node(dmq_node_list_t* list, dmq_node_t* node) {
 	dmq_node_t *cur, **prev;
 	lock_get(&list->lock);
@@ -72,13 +184,12 @@ inline int del_dmq_node(dmq_node_list_t* list, dmq_node_t* node) {
 }
 
 inline dmq_node_t* add_dmq_node(dmq_node_list_t* list, str* uri) {
-	dmq_node_t* newnode = shm_malloc(sizeof(dmq_node_t));
-	memset(newnode, 0, sizeof(dmq_node_t));
-	shm_str_dup(&newnode->orig_uri, uri);
-	if(parse_uri(newnode->orig_uri.s, newnode->orig_uri.len, &newnode->uri) < 0) {
-		LM_ERR("error in parsing node uri\n");
+	dmq_node_t* newnode = build_dmq_node(uri, 1);
+	if(!newnode) {
+		LM_ERR("error creating node\n");
 		goto error;
 	}
+	LM_DBG("dmq node successfully created\n");
 	lock_get(&list->lock);
 	newnode->next = list->nodes;
 	list->nodes = newnode;
@@ -86,7 +197,5 @@ inline dmq_node_t* add_dmq_node(dmq_node_list_t* list, str* uri) {
 	lock_release(&list->lock);
 	return newnode;
 error:
-	shm_free(newnode->orig_uri.s);
-	shm_free(newnode);
 	return NULL;
 }

+ 13 - 1
modules_k/dmq/dmqnode.h

@@ -8,13 +8,18 @@
 #include "../../mem/mem.h"
 #include "../../mem/shm_mem.h"
 #include "../../parser/parse_uri.h"
+#include "../../parser/parse_param.h"
 
-#define NBODY_LEN 1024
+#define NBODY_LEN		1024
+#define DMQ_NODE_ACTIVE		1 << 1
+#define DMQ_NODE_TIMEOUT	1 << 2
+#define DMQ_NODE_DISABLED	1 << 3
 
 typedef struct dmq_node {
 	int local; /* local type set means the dmq dmqnode == self */
 	str orig_uri; /* original uri string - e.g. sip:127.0.0.1:5060;passive=true */
 	struct sip_uri uri; /* parsed uri string */
+	param_t* params; /* uri parameters */
 	int status; /* reserved - maybe something like active,timeout,disabled */
 	int last_notification; /* last notificatino receied from the node */
 	struct dmq_node* next; /* pointer to the next struct dmq_node */
@@ -26,14 +31,21 @@ typedef struct dmq_node_list {
 	int count; /* the number of nodes in the list */
 } dmq_node_list_t;
 
+extern str dmq_node_status_str;
+
 dmq_node_list_t* init_dmq_node_list();
+dmq_node_t* build_dmq_node(str* uri, int shm);
 int update_node_list(dmq_node_list_t* remote_list);
 dmq_node_t* add_dmq_node(dmq_node_list_t* list, str* uri);
 dmq_node_t* find_dmq_node(dmq_node_list_t* list, dmq_node_t* node);
+dmq_node_t* find_dmq_node_uri(dmq_node_list_t* list, str* uri);
 int del_dmq_node(dmq_node_list_t* list, dmq_node_t* node);
 int cmp_dmq_node(dmq_node_t* node, dmq_node_t* cmpnode);
 dmq_node_t* shm_dup_node(dmq_node_t* node);
+void destroy_dmq_node(dmq_node_t* node, int shm);
 void shm_free_node(dmq_node_t* node);
+void pkg_free_node(dmq_node_t* node);
+int set_dmq_node_params(dmq_node_t* node, param_t* params);
 
 extern dmq_node_t* self_node;
 extern dmq_node_t* notification_node;	

+ 24 - 23
modules_k/dmq/notification_peer.c

@@ -1,7 +1,7 @@
 #include "notification_peer.h"
 
 static str notification_content_type = str_init("text/plain");
-dmq_resp_cback_t notification_callback = {&notification_callback_f, 0};
+dmq_resp_cback_t notification_callback = {&notification_resp_callback_f, 0};
 
 int add_notification_peer() {
 	dmq_peer_t not_peer;
@@ -36,7 +36,7 @@ dmq_node_t* add_server_and_notify(str* server_address) {
 		goto error;
 	}
 	/* request initial list from the notification server */
-	if(request_nodelist(node) < 0) {
+	if(request_nodelist(node, 1) < 0) {
 		LM_ERR("error requesting initial nodelist\n");
 		goto error;
 	}
@@ -81,9 +81,6 @@ int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg) {
 	/* acquire big list lock */
 	lock_get(&update_list->lock);
 	while(tmp < end) {
-		cur = shm_malloc(sizeof(dmq_node_t));
-		memset(cur, 0, sizeof(*cur));
-		
 		match = q_memchr(tmp, '\n', end - tmp);
 		if(match) {
 			match++;
@@ -95,22 +92,19 @@ int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg) {
 		tmp_uri.s = tmp;
 		tmp_uri.len = match - tmp - 1;
 		tmp = match;
-		shm_str_dup(&cur->orig_uri, &tmp_uri);
 		/* trim the \r, \n and \0's */
-		trim_r(cur->orig_uri);
-		if(parse_uri(cur->orig_uri.s, cur->orig_uri.len, &cur->uri) < 0) {
-			LM_ERR("cannot parse uri\n");
-			goto error;
-		}
-		if(!find_dmq_node(update_list, cur)) {
-			LM_DBG("found new node %.*s\n", STR_FMT(&cur->orig_uri));
+		trim_r(tmp_uri);
+		if(!find_dmq_node_uri(update_list, &tmp_uri)) {
+			LM_DBG("found new node %.*s\n", STR_FMT(&tmp_uri));
+			cur = build_dmq_node(&tmp_uri, 1);
+			if(!cur) {
+				LM_ERR("error creating new dmq node\n");
+				goto error;
+			}
 			cur->next = update_list->nodes;
 			update_list->nodes = cur;
 			update_list->count++;
 			total_nodes++;
-		} else {
-			shm_free(cur->orig_uri.s);
-			shm_free(cur);
 		}
 	}
 	/* release big list lock */
@@ -118,14 +112,13 @@ int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg) {
 	return total_nodes;
 error:
 	lock_release(&update_list->lock);
-	shm_free(cur->orig_uri.s);
-	shm_free(cur);
 	return -1;
 }
 
 int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) {
 	int nodes_recv;
 	str* response_body = NULL;
+	unsigned int maxforwards = 1;
 	/* received dmqnode list */
 	LM_DBG("dmq triggered from dmq_notification_callback\n");
 	/* parse the message headers */
@@ -133,6 +126,13 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) {
 		LM_ERR("error parsing message headers\n");
 		goto error;
 	}
+	
+	/* extract the maxforwards value, if any */
+	if(msg->maxforwards) {
+		LM_DBG("max forwards: %.*s\n", STR_FMT(&msg->maxforwards->body));
+		str2int(&msg->maxforwards->body, &maxforwards);
+	}
+	
 	nodes_recv = extract_node_list(node_list, msg);
 	LM_DBG("received %d new nodes\n", nodes_recv);
 	response_body = build_notification_body();
@@ -142,8 +142,9 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) {
 	resp->resp_code = 200;
 	
 	/* if we received any new nodes tell about them to the others */
-	if(nodes_recv > 0) {
-		bcast_dmq_message(dmq_notification_peer, response_body, 0, &notification_callback);
+	if(nodes_recv > 0 && maxforwards) {
+		/* maxforwards is set to 0 so that the message is will not be in a spiral */
+		bcast_dmq_message(dmq_notification_peer, response_body, 0, &notification_callback, 0);
 	}
 	LM_DBG("broadcasted message\n");
 	pkg_free(response_body);
@@ -205,16 +206,16 @@ error:
 	return NULL;
 }
 
-int request_nodelist(dmq_node_t* node) {
+int request_nodelist(dmq_node_t* node, int forward) {
 	str* body = build_notification_body();
 	int ret;
-	ret = send_dmq_message(dmq_notification_peer, body, node, &notification_callback);
+	ret = send_dmq_message(dmq_notification_peer, body, node, &notification_callback, forward);
 	pkg_free(body->s);
 	pkg_free(body);
 	return ret;
 }
 
-int notification_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param) {
+int notification_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param) {
 	int ret;
 	LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
 	if(code == 408) {

+ 9 - 2
modules_k/dmq/notification_peer.h

@@ -11,9 +11,16 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp);
 int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg);
 str* build_notification_body();
 int build_node_str(dmq_node_t* node, char* buf, int buflen);
-int request_nodelist(dmq_node_t* node);
+/* request a nodelist from a server
+ * this is acomplished by a KDMQ request
+ * KDMQ notification@server:port
+ * node - the node to send to
+ * forward - flag that tells if the node receiving the message is allowed to 
+ *           forward the request to its own list
+ */
+int request_nodelist(dmq_node_t* node, int forward);
 dmq_node_t* add_server_and_notify(str* server_address);
 
 /* helper functions */
-extern int notification_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
+extern int notification_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
 extern dmq_resp_cback_t notification_callback;