瀏覽代碼

dmq: reverted dmq to the state of v4.2.1

- some of the tunings in master introduced side effects
Daniel-Constantin Mierla 10 年之前
父節點
當前提交
6a9133a119
共有 7 個文件被更改,包括 13 次插入85 次删除
  1. 5 15
      modules/dmq/dmq.c
  2. 0 7
      modules/dmq/dmq_funcs.c
  3. 3 44
      modules/dmq/notification_peer.c
  4. 1 2
      modules/dmq/notification_peer.h
  5. 1 1
      modules/dmq/peer.c
  6. 2 5
      modules/dmq/peer.h
  7. 1 11
      modules/dmq/worker.c

+ 5 - 15
modules/dmq/dmq.c

@@ -75,12 +75,12 @@ sl_api_t slb;
 
 /** module variables */
 str dmq_request_method = str_init("KDMQ");
-dmq_worker_t* workers = NULL;
-dmq_peer_list_t* peer_list = 0;
+dmq_worker_t* workers;
+dmq_peer_list_t* peer_list;
 /* the list of dmq servers */
-dmq_node_list_t* node_list = NULL;
+dmq_node_list_t* node_list;
 // the dmq module is a peer itself for receiving notifications regarding nodes
-dmq_peer_t* dmq_notification_peer = NULL;
+dmq_peer_t* dmq_notification_peer;
 
 /** module functions */
 static int mod_init(void);
@@ -239,14 +239,7 @@ static int mod_init(void)
 		LM_ERR("error in shm_malloc\n");
 		return -1;
 	}
-
-	dmq_init_callback_done = shm_malloc(sizeof(int));
-	if (!dmq_init_callback_done) {
-		LM_ERR("no more shm\n");
-		return -1;
-	}
-	*dmq_init_callback_done = 0;
-
+	
 	/**
 	 * add the dmq notification peer.
 	 * the dmq is a peer itself so that it can receive node notifications
@@ -333,9 +326,6 @@ static void destroy(void) {
 	if (dmq_server_socket.s) {
 		pkg_free(dmq_server_socket.s);
 	}
-	if (dmq_init_callback_done) {
-		shm_free(dmq_init_callback_done);
-	}
 }
 
 static int handle_dmq_fixup(void** param, int param_no)

+ 0 - 7
modules/dmq/dmq_funcs.c

@@ -383,7 +383,6 @@ error:
 int cfg_dmq_t_replicate(struct sip_msg* msg, char* s)
 {
 	dmq_node_t* node;
-	struct socket_info* sock;
 	int i = 0;
 
 	/* avoid loops - do not replicate if message has come from another node
@@ -395,12 +394,6 @@ int cfg_dmq_t_replicate(struct sip_msg* msg, char* s)
 		return -1;
 	}
 
-	/* TODO - backup/restore original send socket */
-	sock = lookup_local_socket(&dmq_server_socket);
-	if (sock) {
-		set_force_socket(msg, sock);
-	}
-
 	lock_get(&node_list->lock);
 	node = node_list->nodes;
 	while(node) {

+ 3 - 44
modules/dmq/notification_peer.c

@@ -29,19 +29,13 @@
 str notification_content_type = str_init("text/plain");
 dmq_resp_cback_t notification_callback = {&notification_resp_callback_f, 0};
 
-int *dmq_init_callback_done = 0;
-
-
 /**
  * @brief add notification peer
  */
 int add_notification_peer()
 {
 	dmq_peer_t not_peer;
-
-	memset(&not_peer, 0, sizeof(dmq_peer_t));
 	not_peer.callback = dmq_notification_callback;
-	not_peer.init_callback = NULL;
 	not_peer.description.s = "notification_peer";
 	not_peer.description.len = 17;
 	not_peer.peer_id.s = "notification_peer";
@@ -171,29 +165,10 @@ error:
 	return -1;
 }
 
-
-int run_init_callbacks() {
-	dmq_peer_t* crt;
-
-	if(peer_list==0) {
-		LM_WARN("peer list is null\n");
-		return 0;
-	}
-	crt = peer_list->peers;
-	while(crt) {
-		if (crt->init_callback) {
-			crt->init_callback();
-		}
-		crt = crt->next;
-	}
-	return 0;
-}
-
-
 /**
  * @brief dmq notification callback
  */
-int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node)
+int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp)
 {
 	int nodes_recv;
 	str* response_body = NULL;
@@ -231,10 +206,6 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_nod
 				&notification_callback, maxforwards, &notification_content_type);
 	}
 	pkg_free(response_body);
-	if (dmq_init_callback_done && !*dmq_init_callback_done) {
-		*dmq_init_callback_done = 1;
-		run_init_callbacks();
-	}
 	return 0;
 error:
 	return -1;
@@ -321,25 +292,13 @@ int notification_resp_callback_f(struct sip_msg* msg, int code,
 		dmq_node_t* node, void* param)
 {
 	int ret;
-	int nodes_recv;
-
 	LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
-	if(code == 200) {
-		nodes_recv = extract_node_list(node_list, msg);
-		LM_DBG("received %d new or changed nodes\n", nodes_recv);
-		if (dmq_init_callback_done && !*dmq_init_callback_done) {
-			*dmq_init_callback_done = 1;
-			run_init_callbacks();
-		}
-	} else if(code == 408) {
+	if(code == 408) {
 		/* deleting node - the server did not respond */
 		LM_ERR("deleting server %.*s because of failed request\n", STR_FMT(&node->orig_uri));
-		if (STR_EQ(node->orig_uri, dmq_notification_address)) {
-			LM_ERR("not deleting notification_peer\n");
-			return 0;
-		}
 		ret = del_dmq_node(node_list, node);
 		LM_DBG("del_dmq_node returned %d\n", ret);
 	}
 	return 0;
 }
+

+ 1 - 2
modules/dmq/notification_peer.h

@@ -34,10 +34,9 @@
 #include "dmq_funcs.h"
 
 extern str notification_content_type;
-extern int *dmq_init_callback_done;
 
 int add_notification_peer();
-int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node);
+int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp);
 int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg);
 str* build_notification_body();
 int build_node_str(dmq_node_t* node, char* buf, int buflen);

+ 1 - 1
modules/dmq/peer.c

@@ -97,7 +97,7 @@ dmq_peer_t* find_peer(str peer_id)
 /**
  * @empty callback
  */
-int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node)
+int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp)
 {
 	return 0;
 }

+ 2 - 5
modules/dmq/peer.h

@@ -28,7 +28,6 @@
 
 #include <string.h>
 #include <stdlib.h>
-#include "dmqnode.h"
 #include "../../lock_ops.h"
 #include "../../str.h"
 #include "../../mem/mem.h"
@@ -42,14 +41,12 @@ typedef struct peer_response {
 	str body;
 } peer_reponse_t;
 
-typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp, dmq_node_t* node);
-typedef int(*init_callback_t)();
+typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp);
 
 typedef struct dmq_peer {
 	str peer_id;
 	str description;
 	peer_callback_t callback;
-	init_callback_t init_callback;
 	struct dmq_peer* next;
 } dmq_peer_t;
 
@@ -67,7 +64,7 @@ typedef dmq_peer_t* (*register_dmq_peer_t)(dmq_peer_t*);
 
 dmq_peer_t* add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer);
 dmq_peer_t* find_peer(str peer_id);
-int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node);
+int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp);
 
 #endif
 

+ 1 - 11
modules/dmq/worker.c

@@ -29,8 +29,6 @@
 #include "../../data_lump_rpl.h"
 #include "../../mod_fix.h"
 #include "../../sip_msg_clone.h"
-#include "../../parser/parse_from.h"
-#include "../../parser/parse_to.h"
 
 /**
  * @brief set the body of a response
@@ -80,7 +78,6 @@ void worker_loop(int id)
 	dmq_job_t* current_job;
 	peer_reponse_t peer_response;
 	int ret_value;
-	dmq_node_t *dmq_node = NULL;
 
 	worker = &workers[id];
 	for(;;) {
@@ -95,14 +92,7 @@ void worker_loop(int id)
 			current_job = job_queue_pop(worker->queue);
 			/* job_queue_pop might return NULL if queue is empty */
 			if(current_job) {
-				/* extract the from uri */
-				if (parse_from_header(current_job->msg) < 0) {
-					LM_ERR("bad sip message or missing From hdr\n");
-				} else {
-					dmq_node = find_dmq_node_uri(node_list, &((struct to_body*)current_job->msg->from->parsed)->uri);
-				}
-
-				ret_value = current_job->f(current_job->msg, &peer_response, dmq_node);
+				ret_value = current_job->f(current_job->msg, &peer_response);
 				if(ret_value < 0) {
 					LM_ERR("running job failed\n");
 					continue;