Преглед на файлове

added support for binding the dmq module within another module.

also, finished the implementation for the dmq worker queues
Marius Bucur преди 14 години
родител
ревизия
ba31d863c9
променени са 7 файла, в които са добавени 279 реда и са изтрити 0 реда
  1. 7 0
      modules_k/dmq/bind_dmq.c
  2. 29 0
      modules_k/dmq/message.c
  3. 2 0
      modules_k/dmq/message.h
  4. 43 0
      modules_k/dmq/peer.c
  5. 35 0
      modules_k/dmq/peer.h
  6. 120 0
      modules_k/dmq/worker.c
  7. 43 0
      modules_k/dmq/worker.h

+ 7 - 0
modules_k/dmq/bind_dmq.c

@@ -0,0 +1,7 @@
+#include "bind_dmq.h"
+#include "peer.h"
+
+int bind_dmq(dmq_api_t* api) {
+	api->register_dmq_peer = register_dmq_peer;
+	return 0;
+}

+ 29 - 0
modules_k/dmq/message.c

@@ -0,0 +1,29 @@
+#include "../../parser/parse_to.h"
+#include "../../parser/parse_uri.h" 
+#include "../../parser/parse_content.h"
+#include "../../parser/parse_from.h"
+#include "../../ut.h"
+#include "worker.h"
+#include "peer.h"
+#include "message.h"
+
+int handle_dmq_message(struct sip_msg* msg, char* str1, char* str2) {
+	dmq_peer_t* peer;
+	if ((parse_sip_msg_uri(msg) < 0) || (!msg->parsed_uri.user.s)) {
+			LM_ERR("cannot parse msg URI\n");
+			return -1;
+	}
+	LM_DBG("handle_dmq_message [%.*s %.*s] [%s %s]\n",
+	       msg->first_line.u.request.method.len, msg->first_line.u.request.method.s,
+	       msg->first_line.u.request.uri.len, msg->first_line.u.request.uri.s,
+	       ZSW(str1), ZSW(str2));
+	/* the peer id is given as the userinfo part of the request URI */
+	peer = find_peer(msg->parsed_uri.user);
+	if(!peer) {
+		LM_DBG("no peer found for %.*s\n", msg->parsed_uri.user.len, msg->parsed_uri.user.s);
+		return 0;
+	}
+	LM_DBG("handle_dmq_message peer found: %.*s\n", msg->parsed_uri.user.len, msg->parsed_uri.user.s);
+	add_dmq_job(msg, peer);
+	return 0;
+}

+ 2 - 0
modules_k/dmq/message.h

@@ -0,0 +1,2 @@
+
+int handle_dmq_message(struct sip_msg*, char*, char*);

+ 43 - 0
modules_k/dmq/peer.c

@@ -0,0 +1,43 @@
+#include "peer.h"
+
+dmq_peer_list_t* init_peer_list() {
+	dmq_peer_list_t* peer_list = shm_malloc(sizeof(dmq_peer_list_t));
+	memset(peer_list, 0, sizeof(dmq_peer_list_t));
+	return peer_list;
+}
+
+dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
+	dmq_peer_t* cur = peer_list->peers;
+	int len;
+	while(cur) {
+		/* len - the minimum length of the two strings */
+		len = cur->peer_id.len < peer->peer_id.len ? cur->peer_id.len:peer->peer_id.len;
+		if(strncasecmp(cur->peer_id.s, peer->peer_id.s, len) == 0) {
+			return cur;
+		}
+	}
+	return 0;
+}
+
+void add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
+	dmq_peer_t* new_peer = shm_malloc(sizeof(dmq_peer_t));
+	*new_peer = *peer;
+	new_peer->next = peer_list->peers;
+	peer_list->peers = new_peer;
+}
+
+int register_dmq_peer(dmq_peer_t* peer) {
+	if(search_peer_list(peer_list, peer)) {
+		LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len, peer->peer_id.s,
+		       peer->description.len, peer->description.s);
+		return -1;
+	}
+	add_peer(peer_list, peer);
+	return 0;
+}
+
+dmq_peer_t* find_peer(str peer_id) {
+	dmq_peer_t foo_peer;
+	foo_peer.peer_id = peer_id;
+	return search_peer_list(peer_list, &foo_peer);
+}

+ 35 - 0
modules_k/dmq/peer.h

@@ -0,0 +1,35 @@
+#ifndef PEER_H
+#define PEER_H
+
+#include <string.h>
+#include <stdlib.h>
+#include "../../str.h"
+#include "../../mem/mem.h"
+#include "../../mem/shm_mem.h"
+#include "../../parser/msg_parser.h"
+
+typedef int(*peer_callback_t)(struct sip_msg*);
+
+typedef struct dmq_peer {
+	str peer_id;
+	str description;
+	peer_callback_t callback;
+	struct dmq_peer* next;
+} dmq_peer_t;
+
+typedef struct dmq_peer_list {
+	dmq_peer_t* peers;
+	int count;
+} dmq_peer_list_t;
+
+extern dmq_peer_list_t* peer_list;
+
+dmq_peer_list_t* init_peer_list();
+dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer);
+typedef int (*register_dmq_peer_t)(dmq_peer_t*);
+
+int register_dmq_peer(dmq_peer_t* peer);
+dmq_peer_t* find_peer(str peer_id);
+
+
+#endif

+ 120 - 0
modules_k/dmq/worker.c

@@ -0,0 +1,120 @@
+#include "dmq.h"
+#include "worker.h"
+
+void worker_loop(int id) {
+	dmq_worker_t* worker = &workers[id];
+	dmq_job_t* current_job;
+	int ret_value;
+	for(;;) {
+		LM_DBG("dmq_worker [%d %d] getting lock\n", id, my_pid());
+		lock_get(&worker->lock);
+		LM_DBG("dmq_worker [%d %d] lock acquired\n", id, my_pid());
+		/* multiple lock_release calls might be performed, so remove from queue until empty */
+		do {
+			current_job = job_queue_pop(worker->queue);
+			/* job_queue_pop might return NULL if queue is empty */
+			if(current_job) {
+				ret_value = current_job->f(current_job->msg);
+				if(ret_value < 0) {
+					LM_ERR("running job failed\n");
+				}
+				shm_free(current_job);
+				worker->jobs_processed++;
+			}
+		} while(job_queue_size(worker->queue) > 0);
+	}
+}
+
+int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer) {
+	int i, found_available = 0;
+	dmq_job_t new_job;
+	dmq_worker_t* worker;
+	new_job.f = peer->callback;
+	new_job.msg = msg;
+	new_job.orig_peer = peer;
+	if(!num_workers) {
+		LM_ERR("error in add_dmq_job no workers spawned\n");
+		return -1;
+	}
+	/* initialize the worker with the first one */
+	worker = workers;
+	/* search for an available worker, or, if not possible, for the least busy one */
+	for(i = 0; i < num_workers; i++) {
+		if(job_queue_size(workers[i].queue) == 0) {
+			worker = &workers[i];
+			found_available = 1;
+			break;
+		} else if(job_queue_size(workers[i].queue) < job_queue_size(worker->queue)) {
+			worker = &workers[i];
+		}
+	}
+	if(!found_available) {
+		LM_DBG("no available worker found, passing job to the least busy one\n");
+	}
+	job_queue_push(worker->queue, &new_job);
+	lock_release(&worker->lock);
+	return 0;
+}
+
+void init_worker(dmq_worker_t* worker) {
+	memset(worker, 0, sizeof(*worker));
+	lock_init(&worker->lock);
+	// acquire the lock for the first time - so that dmq_worker_loop blocks
+	lock_get(&worker->lock);
+	worker->queue = alloc_job_queue();
+}
+
+job_queue_t* alloc_job_queue() {
+	job_queue_t* queue = shm_malloc(sizeof(job_queue_t));
+	atomic_set(&queue->count, 0);
+	queue->front = NULL;
+	queue->back = NULL;
+	lock_init(&queue->lock);
+	return queue;
+}
+
+void destroy_job_queue(job_queue_t* queue) {
+	shm_free(queue);
+}
+
+int job_queue_size(job_queue_t* queue) {
+	return atomic_get(&queue->count);
+}
+
+void job_queue_push(job_queue_t* queue, dmq_job_t* job) {
+	/* we need to copy the dmq_job into a newly created dmq_job in shm */
+	dmq_job_t* newjob = shm_malloc(sizeof(dmq_job_t));
+	*newjob = *job;
+	
+	lock_get(&queue->lock);
+	newjob->prev = NULL;
+	newjob->next = queue->back;
+	if(queue->back) {
+		queue->back->prev = newjob;
+	}
+	queue->back = newjob;
+	if(!queue->front) {
+		queue->front = newjob;
+	}
+	atomic_inc(&queue->count);
+	lock_release(&queue->lock);
+}
+dmq_job_t* job_queue_pop(job_queue_t* queue) {
+	dmq_job_t* front;
+	lock_get(&queue->lock);
+	if(!queue->front) {
+		lock_release(&queue->lock);
+		return NULL;
+	}
+	front = queue->front;
+	if(front->prev) {
+		queue->front = front->prev;
+		front->prev->next = NULL;
+	} else {
+		queue->front = NULL;
+		queue->back = NULL;
+	}
+	atomic_dec(&queue->count);
+	lock_release(&queue->lock);
+	return front;
+}

+ 43 - 0
modules_k/dmq/worker.h

@@ -0,0 +1,43 @@
+#ifndef DMQ_WORKER_H
+#define DMQ_WORKER_H
+
+#include "peer.h"
+#include "../../locking.h"
+#include "../../atomic_ops.h"
+#include "../../parser/msg_parser.h"
+
+typedef struct dmq_job {
+	peer_callback_t f;
+	struct sip_msg* msg;
+	dmq_peer_t* orig_peer;
+	struct dmq_job* next;
+	struct dmq_job* prev;
+} dmq_job_t;
+
+typedef struct job_queue {
+	atomic_t count;
+	struct dmq_job* back;
+	struct dmq_job* front;
+	gen_lock_t lock;
+} job_queue_t;
+
+struct dmq_worker {
+	job_queue_t* queue;
+	int jobs_processed;
+	gen_lock_t lock;
+	int pid;
+};
+
+typedef struct dmq_worker dmq_worker_t;
+
+void init_worker(dmq_worker_t* worker);
+int add_dmq_job(struct sip_msg*, dmq_peer_t*);
+void worker_loop(int id);
+
+job_queue_t* alloc_job_queue();
+void destroy_job_queue(job_queue_t* queue);
+void job_queue_push(job_queue_t* queue, dmq_job_t* job);
+dmq_job_t* job_queue_pop(job_queue_t* queue);
+int job_queue_size(job_queue_t* queue);
+
+#endif