2
0
Эх сурвалжийг харах

added dmq notification peer changes

Marius Bucur 14 жил өмнө
parent
commit
04fa5b387d

+ 12 - 2
modules_k/dmq/dmq.c

@@ -50,6 +50,7 @@
 #include "peer.h"
 #include "bind_dmq.h"
 #include "worker.h"
+#include "notification_peer.h"
 #include "../../mod_fix.h"
 
 static int mod_init(void);
@@ -75,6 +76,8 @@ sl_api_t slb;
 /** module variables */
 dmq_worker_t* workers;
 dmq_peer_list_t* peer_list;
+// the dmq module is a peer itself for receiving notifications regarding nodes
+dmq_peer_t dmq_notification_peer;
 
 /** module functions */
 static int mod_init(void);
@@ -144,7 +147,7 @@ static int mod_init(void) {
 	/* load peer list - the list containing the module callbacks for dmq */
 	peer_list = init_peer_list();
 	
-	/* register worker processes */
+	/* register worker processes - add one because of the ping process */
 	register_procs(num_workers);
 	
 	/* allocate workers array */
@@ -154,12 +157,14 @@ static int mod_init(void) {
 		return -1;
 	}
 	
+	/* add first dmq peer - the dmq module itself to receive peer notify messages */
+	
 	startup_time = (int) time(NULL);
 	return 0;
 }
 
 /**
- * Initialize children
+ * initialize children
  */
 static int child_init(int rank) {
   	int i, newpid;
@@ -179,6 +184,11 @@ static int child_init(int rank) {
 				workers[i].pid = newpid;
 			}
 		}
+		/**
+		 * add the dmq notification peer.
+		 * the dmq is a peer itself so that it can receive node notifications
+		 */
+		add_notification_peer();
 		return 0;
 	}
 	if(rank == PROC_INIT || rank == PROC_TCP_MAIN) {

+ 1 - 0
modules_k/dmq/dmq.h

@@ -12,6 +12,7 @@
 
 extern int num_workers;
 extern dmq_worker_t* workers;
+extern dmq_peer_t dmq_notification_peer;
 
 static inline int dmq_load_api(dmq_api_t* api) {
 	bind_dmq_f binddmq;

+ 11 - 0
modules_k/dmq/peer.c

@@ -3,6 +3,7 @@
 dmq_peer_list_t* init_peer_list() {
 	dmq_peer_list_t* peer_list = shm_malloc(sizeof(dmq_peer_list_t));
 	memset(peer_list, 0, sizeof(dmq_peer_list_t));
+	lock_init(&peer_list->lock);
 	return peer_list;
 }
 
@@ -15,6 +16,7 @@ dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
 		if(strncasecmp(cur->peer_id.s, peer->peer_id.s, len) == 0) {
 			return cur;
 		}
+		cur = cur->next;
 	}
 	return 0;
 }
@@ -22,17 +24,26 @@ dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
 void add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
 	dmq_peer_t* new_peer = shm_malloc(sizeof(dmq_peer_t));
 	*new_peer = *peer;
+	
+	/* copy the str's */
+	new_peer->peer_id.s = shm_malloc(peer->peer_id.len);
+	memcpy(new_peer->peer_id.s, peer->peer_id.s, peer->peer_id.len);
+	new_peer->description.s = shm_malloc(peer->description.len);
+	memcpy(new_peer->peer_id.s, peer->peer_id.s, peer->peer_id.len);
+	
 	new_peer->next = peer_list->peers;
 	peer_list->peers = new_peer;
 }
 
 int register_dmq_peer(dmq_peer_t* peer) {
+	lock_get(&peer_list->lock);
 	if(search_peer_list(peer_list, peer)) {
 		LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len, peer->peer_id.s,
 		       peer->description.len, peer->description.s);
 		return -1;
 	}
 	add_peer(peer_list, peer);
+	lock_release(&peer_list->lock);
 	return 0;
 }
 

+ 1 - 0
modules_k/dmq/peer.h

@@ -18,6 +18,7 @@ typedef struct dmq_peer {
 } dmq_peer_t;
 
 typedef struct dmq_peer_list {
+	gen_lock_t lock;
 	dmq_peer_t* peers;
 	int count;
 } dmq_peer_list_t;

+ 2 - 1
modules_k/dmq/worker.c

@@ -49,7 +49,8 @@ int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer) {
 		}
 	}
 	if(!found_available) {
-		LM_DBG("no available worker found, passing job to the least busy one\n");
+		LM_DBG("no available worker found, passing job to the least busy one [%d %d]\n",
+		       worker->pid, job_queue_size(worker->queue));
 	}
 	job_queue_push(worker->queue, &new_job);
 	lock_release(&worker->lock);