소스 검색

kazoo : first approach to multiple consumers

Luis Azedo 10 년 전
부모
커밋
64ccb0f927
3개의 변경된 파일80개의 추가작업 그리고 47개의 파일을 삭제
  1. 20 14
      modules/kazoo/kazoo.c
  2. 57 30
      modules/kazoo/kz_amqp.c
  3. 3 3
      modules/kazoo/kz_amqp.h

+ 20 - 14
modules/kazoo/kazoo.c

@@ -41,7 +41,8 @@
 #include "kz_trans.h"
 #include "kz_pua.h"
 
-#define DBK_DEFAULT_NO_CONSUMERS 8
+#define DBK_DEFAULT_NO_CONSUMERS 1
+#define DBK_DEFAULT_NO_WORKERS 8
 
 static int mod_init(void);
 static int  mod_child_init(int rank);
@@ -71,6 +72,7 @@ int dbk_create_empty_dialog = 1;
 int dbk_channels = 50;
 
 int dbk_consumer_processes = DBK_DEFAULT_NO_CONSUMERS;
+int dbk_consumer_workers = DBK_DEFAULT_NO_WORKERS;
 
 struct timeval kz_sock_tv = (struct timeval){0,100000};
 struct timeval kz_amqp_tv = (struct timeval){0,100000};
@@ -176,6 +178,7 @@ static param_export_t params[] = {
     {"amqp_waitframe_timeout_micro", INT_PARAM, &kz_amqp_tv.tv_usec},
     {"amqp_waitframe_timeout_sec", INT_PARAM, &kz_amqp_tv.tv_sec},
     {"amqp_consumer_processes", INT_PARAM, &dbk_consumer_processes},
+    {"amqp_consumer_workers", INT_PARAM, &dbk_consumer_workers},
     {"amqp_consumer_event_key", STR_PARAM, &dbk_consumer_event_key.s},
     {"amqp_consumer_event_subkey", STR_PARAM, &dbk_consumer_event_subkey.s},
     {"amqp_query_timeout_micro", INT_PARAM, &kz_qtimeout_tv.tv_usec},
@@ -309,7 +312,7 @@ static int mod_init(void) {
     }
 
 
-    int total_workers = dbk_consumer_processes + 2 + kz_server_counter;
+    int total_workers = dbk_consumer_workers + (dbk_consumer_processes * kz_server_counter) + 2;
 
     register_procs(total_workers);
     cfg_register_child(total_workers);
@@ -319,9 +322,9 @@ static int mod_init(void) {
 		return -1;
 	}
 
-    kz_worker_pipes_fds = (int*) shm_malloc(sizeof(int) * (dbk_consumer_processes) * 2 );
-    kz_worker_pipes = (int*) shm_malloc(sizeof(int) * dbk_consumer_processes );
-    for(i=0; i < dbk_consumer_processes; i++) {
+    kz_worker_pipes_fds = (int*) shm_malloc(sizeof(int) * (dbk_consumer_workers) * 2 );
+    kz_worker_pipes = (int*) shm_malloc(sizeof(int) * dbk_consumer_workers);
+    for(i=0; i < dbk_consumer_workers; i++) {
     	kz_worker_pipes_fds[i*2] = kz_worker_pipes_fds[i*2+1] = -1;
 		if (pipe(&kz_worker_pipes_fds[i*2]) < 0) {
 			LM_ERR("worker pipe(%d) failed\n", i);
@@ -330,7 +333,7 @@ static int mod_init(void) {
     }
 
 	kz_cmd_pipe = kz_cmd_pipe_fds[1];
-	for(i=0; i < dbk_consumer_processes; i++) {
+	for(i=0; i < dbk_consumer_workers; i++) {
 		kz_worker_pipes[i] = kz_worker_pipes_fds[i*2+1];
 	}
 
@@ -374,23 +377,26 @@ static int mod_child_init(int rank)
 		}
 		*/
 
-		for(i=0; i < dbk_consumer_processes; i++) {
+		for(i=0; i < dbk_consumer_workers; i++) {
 			pid=fork_process(i+1, "AMQP Consumer Worker", 1);
 			if (pid<0)
 				return -1; /* error */
 			if(pid==0){
 				close(kz_worker_pipes_fds[i*2+1]);
-				kz_amqp_consumer_worker_proc(kz_worker_pipes_fds[i*2]);
+				return(kz_amqp_consumer_worker_proc(kz_worker_pipes_fds[i*2]));
 			}
 		}
 
 		for (g = kz_amqp_get_zones(); g != NULL; g = g->next) {
-			for (s = g->servers->head; s != NULL; s = s->next) {
-				pid=fork_process(PROC_NOCHLDINIT, "AMQP Consumer", 0);
-				if (pid<0)
-					return -1; /* error */
-				if(pid==0){
-					return(kz_amqp_consumer_proc(s));
+			int w = (g == kz_amqp_get_primary_zone() ? dbk_consumer_processes : 1);
+			for(i=0; i < w; i++) {
+				for (s = g->servers->head; s != NULL; s = s->next) {
+					pid=fork_process(PROC_NOCHLDINIT, "AMQP Consumer", 0);
+					if (pid<0)
+						return -1; /* error */
+					if(pid==0){
+						return(kz_amqp_consumer_proc(s));
+					}
 				}
 			}
 		}

+ 57 - 30
modules/kazoo/kz_amqp.c

@@ -1699,7 +1699,7 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int idx )
 		goto error;
     }
 
-    amqp_queue_declare(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue, 0, 0, 1, 1, kz_amqp_empty_table);
+    amqp_queue_declare(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue, 0, 0, 0, 1, kz_amqp_empty_table);
     if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
     {
 		goto error;
@@ -1711,7 +1711,7 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int idx )
 		goto error;
     }
 
-    if (amqp_basic_consume(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, 1, 1, kz_amqp_empty_table) < 0
+    if (amqp_basic_consume(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, 1, 0, kz_amqp_empty_table) < 0
 	    || kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn)))
     {
 		goto error;
@@ -2000,7 +2000,7 @@ void kz_amqp_send_consumer_event_ex(char* payload, char* event_key, char* event_
 
 	if(nextConsumer) {
 		consumer++;
-		if(consumer >= dbk_consumer_processes) {
+		if(consumer >= dbk_consumer_workers) {
 			consumer = 0;
 		}
 	}
@@ -2048,7 +2048,7 @@ int kz_send_worker_error_event(kz_amqp_cmd_ptr cmd)
 	ptr->cmd = cmd;
 
 	consumer++;
-	if(consumer >= dbk_consumer_processes) {
+	if(consumer >= dbk_consumer_workers) {
 		consumer = 0;
 	}
 
@@ -2633,7 +2633,7 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_
 	}
 
 	consumer++;
-	if(consumer >= dbk_consumer_processes) {
+	if(consumer >= dbk_consumer_workers) {
 		consumer = 0;
 	}
 
@@ -2662,28 +2662,42 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 	int OK;
 	char* payload;
 	int channel_res;
+	kz_amqp_conn_ptr consumer = NULL;
+	kz_amqp_channel_ptr consumer_channels = NULL;
+	int channel_base = 0;
 
-    server_ptr->consumer = (kz_amqp_conn_ptr)pkg_malloc(sizeof(kz_amqp_conn));
-    if(server_ptr->consumer == NULL)
+	if(server_ptr->zone == kz_amqp_get_primary_zone())
+		channel_base = dbk_channels;
+
+//    server_ptr->consumer = (kz_amqp_conn_ptr)pkg_malloc(sizeof(kz_amqp_conn));
+//    if(server_ptr->consumer == NULL)
+	consumer = (kz_amqp_conn_ptr)pkg_malloc(sizeof(kz_amqp_conn));
+    if(consumer == NULL)
     {
     	LM_ERR("NO MORE PACKAGE MEMORY\n");
     	return 1;
     }
-    memset(server_ptr->consumer, 0, sizeof(kz_amqp_conn));
-    server_ptr->consumer->server = server_ptr;
-
-    server_ptr->consumer_channels = (kz_amqp_channel_ptr)pkg_malloc(sizeof(kz_amqp_channel)*bindings_count);
-    if(server_ptr->consumer_channels == NULL)
+//    memset(server_ptr->consumer, 0, sizeof(kz_amqp_conn));
+//    server_ptr->consumer->server = server_ptr;
+    memset(consumer, 0, sizeof(kz_amqp_conn));
+    consumer->server = server_ptr;
+
+    consumer_channels = (kz_amqp_channel_ptr)pkg_malloc(sizeof(kz_amqp_channel)*bindings_count);
+//    server_ptr->consumer_channels = (kz_amqp_channel_ptr)pkg_malloc(sizeof(kz_amqp_channel)*bindings_count);
+//    if(server_ptr->consumer_channels == NULL)
+    if(consumer_channels == NULL)
     {
     	LM_ERR("NO MORE PACKAGE MEMORY\n");
     	return 1;
     }
 	for(i=0; i < bindings_count; i++)
-		server_ptr->consumer_channels[i].channel = dbk_channels + i + 1;
+//		server_ptr->consumer_channels[i].channel = dbk_channels + i + 1;
+		consumer_channels[i].channel = channel_base + i + 1;
 
     while(1) {
     	OK = 1;
-   		if(kz_amqp_connection_open(server_ptr->consumer)) {
+//   		if(kz_amqp_connection_open(server_ptr->consumer)) {
+   		if(kz_amqp_connection_open(consumer)) {
    			LM_ERR("Error opening connection\n");
    			sleep(3);
    			continue;
@@ -2692,21 +2706,24 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 
     	/* reset channels */
 
-    	for(i=0,channel_res=0; i < dbk_channels && channel_res == 0; i++) {
+    	for(i=0,channel_res=0; i < channel_base && channel_res == 0; i++) {
 			/* start cleanup */
 //    		server_ptr->channels[i].consumer = NULL;
 			/* end cleanup */
 
 			/* bind targeted channels */
-			channel_res = kz_amqp_channel_open(server_ptr->consumer, server_ptr->channels[i].channel);
+//			channel_res = kz_amqp_channel_open(server_ptr->consumer, server_ptr->channels[i].channel);
+			channel_res = kz_amqp_channel_open(consumer, server_ptr->channels[i].channel);
 			if(channel_res == 0) {
-				kz_amqp_bind_targeted_channel(server_ptr->consumer, i);
+//				kz_amqp_bind_targeted_channel(server_ptr->consumer, i);
+				kz_amqp_bind_targeted_channel(consumer, i);
 			}
     	}
 
     	for(i=0,channel_res=0; i < bindings_count && channel_res == 0; i++) {
 			/* start cleanup */
-    		server_ptr->consumer_channels[i].consumer = NULL;
+//    		server_ptr->consumer_channels[i].consumer = NULL;
+    		consumer_channels[i].consumer = NULL;
 			/* end cleanup */
     	}
 
@@ -2716,10 +2733,13 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 			kz_amqp_binding_ptr binding = kz_bindings->head;
 			while(binding != NULL && OK) {
 				if(binding->bind->federate || server_ptr->zone == kz_amqp_get_primary_zone()) {
-					channel_res = kz_amqp_channel_open(server_ptr->consumer, server_ptr->consumer_channels[i].channel);
+//					channel_res = kz_amqp_channel_open(server_ptr->consumer, server_ptr->consumer_channels[i].channel);
+					channel_res = kz_amqp_channel_open(consumer, consumer_channels[i].channel);
 					if(channel_res == 0) {
-						kz_amqp_bind_consumer(server_ptr->consumer, binding->bind, i, server_ptr->consumer_channels);
-						server_ptr->consumer_channels[i].state = KZ_AMQP_CHANNEL_BINDED;
+//						kz_amqp_bind_consumer(server_ptr->consumer, binding->bind, i, server_ptr->consumer_channels);
+						kz_amqp_bind_consumer(consumer, binding->bind, i, consumer_channels);
+//						server_ptr->consumer_channels[i].state = KZ_AMQP_CHANNEL_BINDED;
+						consumer_channels[i].state = KZ_AMQP_CHANNEL_BINDED;
 						i++;
 					} else {
 						LM_ERR("Error opening channel %d in server %s\n", i, server_ptr->connection->url);
@@ -2735,8 +2755,10 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 		while(OK) {
 			payload = NULL;
 			amqp_envelope_t envelope;
-			amqp_maybe_release_buffers(server_ptr->consumer->conn);
-			amqp_rpc_reply_t reply = amqp_consume_message(server_ptr->consumer->conn, &envelope, NULL, 0);
+//			amqp_maybe_release_buffers(server_ptr->consumer->conn);
+			amqp_maybe_release_buffers(consumer->conn);
+//			amqp_rpc_reply_t reply = amqp_consume_message(server_ptr->consumer->conn, &envelope, NULL, 0);
+			amqp_rpc_reply_t reply = amqp_consume_message(consumer->conn, &envelope, NULL, 0);
 			switch(reply.reply_type) {
 			case AMQP_RESPONSE_LIBRARY_EXCEPTION:
 				switch(reply.library_error) {
@@ -2748,7 +2770,8 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 					break;
 				case AMQP_STATUS_UNEXPECTED_STATE:
 					LM_DBG("AMQP_STATUS_UNEXPECTED_STATE\n");
-					OK = kz_amqp_consume_error(server_ptr->consumer);
+//					OK = kz_amqp_consume_error(server_ptr->consumer);
+					OK = kz_amqp_consume_error(consumer);
 					break;
 				default:
 					OK = 0;
@@ -2758,18 +2781,21 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 
 			case AMQP_RESPONSE_NORMAL:
 				idx = envelope.channel-1;
-				if(idx < dbk_channels) {
+				if(idx < channel_base) {
 					kz_amqp_send_worker_event(server_ptr->id, &envelope, NULL);
 				} else {
-					idx = idx - dbk_channels;
-					if(!server_ptr->consumer_channels[idx].consumer->no_ack ) {
-						if(amqp_basic_ack(server_ptr->consumer->conn, envelope.channel, envelope.delivery_tag, 0 ) < 0) {
+					idx = idx - channel_base;
+//					if(!server_ptr->consumer_channels[idx].consumer->no_ack ) {
+					if(!consumer_channels[idx].consumer->no_ack ) {
+//						if(amqp_basic_ack(server_ptr->consumer->conn, envelope.channel, envelope.delivery_tag, 0 ) < 0) {
+						if(amqp_basic_ack(consumer->conn, envelope.channel, envelope.delivery_tag, 0 ) < 0) {
 							LM_ERR("AMQP ERROR TRYING TO ACK A MSG\n");
 							OK = 0;
 						}
 					}
 					if(OK)
-						kz_amqp_send_worker_event(server_ptr->id, &envelope, server_ptr->consumer_channels[idx].consumer);
+//						kz_amqp_send_worker_event(server_ptr->id, &envelope, server_ptr->consumer_channels[idx].consumer);
+						kz_amqp_send_worker_event(server_ptr->id, &envelope, consumer_channels[idx].consumer);
 				}
 				/*
 				idx = envelope.channel-1;
@@ -2803,7 +2829,8 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 			amqp_destroy_envelope(&envelope);
 		}
 
-    	kz_amqp_connection_close(server_ptr->consumer);
+//    	kz_amqp_connection_close(server_ptr->consumer);
+    	kz_amqp_connection_close(consumer);
 
     }
     return 0;

+ 3 - 3
modules/kazoo/kz_amqp.h

@@ -54,7 +54,7 @@ extern int dbk_channels;
 extern str dbk_node_hostname;
 extern str dbk_consumer_event_key;
 extern str dbk_consumer_event_subkey;
-extern int dbk_consumer_processes;
+extern int dbk_consumer_workers;
 
 typedef struct kz_amqp_connection_t {
 	kz_amqp_connection_info info;
@@ -197,9 +197,9 @@ typedef struct kz_amqp_server_t {
 	struct kz_amqp_zone_t* zone;
 	kz_amqp_connection_ptr connection;
 	kz_amqp_conn_ptr producer;
-	kz_amqp_conn_ptr consumer;
+//	kz_amqp_conn_ptr consumer;
 	kz_amqp_channel_ptr channels;
-	kz_amqp_channel_ptr consumer_channels;
+//	kz_amqp_channel_ptr consumer_channels;
     struct kz_amqp_server_t* next;
 } kz_amqp_server, *kz_amqp_server_ptr;