Forráskód Böngészése

kazoo : changes in targeted exchanges

Luis Azedo 10 éve
szülő
commit
1f124fa5db
5 módosított fájl, 578 hozzáadás és 87 törlés
  1. 8 1
      modules/kazoo/kazoo.c
  2. 346 84
      modules/kazoo/kz_amqp.c
  3. 27 2
      modules/kazoo/kz_amqp.h
  4. 182 0
      modules/kazoo/kz_hash.c
  5. 15 0
      modules/kazoo/kz_hash.h

+ 8 - 1
modules/kazoo/kazoo.c

@@ -57,6 +57,7 @@ int kz_zone_counter = 0;
 int dbk_auth_wait_timeout = 3;
 int dbk_reconn_retries = 8;
 int dbk_presentity_phtable_size = 4096;
+int dbk_command_table_size = 2048;
 
 int dbk_use_federated_exchange = 1;
 str dbk_federated_exchange = str_init("federation");
@@ -189,6 +190,7 @@ static param_export_t params[] = {
     {"federated_exchange", STR_PARAM, &dbk_federated_exchange.s},
     {"amqp_heartbeats", INT_PARAM, &dbk_use_hearbeats},
     {"amqp_primary_zone", STR_PARAM, &dbk_primary_zone_name.s},
+    {"amqp_command_hashtable_size", INT_PARAM, &dbk_command_table_size},
     {0, 0, 0}
 };
 
@@ -347,14 +349,19 @@ static int mod_child_init(int rank)
 	if (rank==PROC_INIT || rank==PROC_TCP_MAIN)
 		return 0;
 
+//	if (rank>PROC_MAIN)
+//		kz_cmd_pipe = kz_cmd_pipe_fds[1];
+
 
 	if (rank==PROC_MAIN) {
+		/*
 		pid=fork_process(PROC_NOCHLDINIT, "AMQP Timer", 0);
 		if (pid<0)
-			return -1; /* error */
+			return -1;
 		if(pid==0){
 			return(kz_amqp_timeout_proc());
 		}
+		*/
 
 		for(i=0; i < dbk_consumer_processes; i++) {
 			pid=fork_process(i+1, "AMQP Consumer Worker", 1);

+ 346 - 84
modules/kazoo/kz_amqp.c

@@ -16,6 +16,7 @@
 
 #include "kz_amqp.h"
 #include "kz_json.h"
+#include "kz_hash.h"
 
 #define RET_AMQP_ERROR 2
 
@@ -58,6 +59,40 @@ int set_non_blocking(int fd)
 	return 0;
 }
 
+static inline str* kz_str_dup(str* src)
+{
+	char *dst_char = (char*)shm_malloc(sizeof(str)+src->len+1);
+	if (!dst_char) {
+		LM_ERR("error allocating shared memory for str");
+		return NULL;
+	}
+	str* dst = (str*)dst_char;
+	dst->s = dst_char+sizeof(str);
+
+	memcpy(dst->s, src->s, src->len);
+	dst->len = src->len;
+	dst->s[dst->len] = '\0';
+	return dst;
+}
+
+static inline str* kz_str_dup_from_char(char* src)
+{
+	int len = strlen(src);
+	char *dst_char = (char*)shm_malloc(sizeof(str)+len+1);
+	if (!dst_char) {
+		LM_ERR("error allocating shared memory for str");
+		return NULL;
+	}
+	str* dst = (str*)dst_char;
+	dst->s = dst_char+sizeof(str);
+
+	memcpy(dst->s, src, len);
+	dst->len = len;
+	dst->s[dst->len] = '\0';
+	return dst;
+}
+
+
 static char *kz_amqp_str_dup(str *src)
 {
 	char *res;
@@ -99,6 +134,7 @@ static char *kz_local_amqp_str_dup(str *src)
 	return res;
 }
 
+/*
 static char *kz_local_amqp_string_dup(char *src)
 {
 	char *res;
@@ -113,6 +149,7 @@ static char *kz_local_amqp_string_dup(char *src)
 	res[sz] = 0;
 	return res;
 }
+*/
 
 char *kz_amqp_bytes_dup(amqp_bytes_t bytes)
 {
@@ -257,6 +294,8 @@ void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd)
 		shm_free(cmd->payload);
 	if (cmd->return_payload)
 		shm_free(cmd->return_payload);
+	if (cmd->message_id)
+		shm_free(cmd->message_id);
 	lock_release(&cmd->lock);
 	lock_destroy(&cmd->lock);
 	shm_free(cmd);
@@ -431,6 +470,10 @@ int kz_amqp_init() {
 	int i;
 	kz_amqp_zone_ptr g;
 	kz_amqp_server_ptr s;
+
+	if(!kz_hash_init())
+		return 0;
+
 	if(kz_bindings == NULL) {
 		kz_bindings = (kz_amqp_bindings_ptr) shm_malloc(sizeof(kz_amqp_bindings));
 		memset(kz_bindings, 0, sizeof(kz_amqp_bindings));
@@ -487,6 +530,7 @@ kz_amqp_server_ptr kz_amqp_destroy_server(kz_amqp_server_ptr server_ptr)
     kz_amqp_server_ptr next = server_ptr->next;
 	kz_amqp_destroy_connection(server_ptr->connection);
 	kz_amqp_destroy_channels(server_ptr);
+	shm_free(server_ptr->producer);
 	shm_free(server_ptr);
 	return next;
 }
@@ -510,6 +554,7 @@ void kz_amqp_destroy_zones()
 	while(g) {
 		g = kz_amqp_destroy_zone(g);
 	}
+	shm_free(kz_zones);
 	kz_zones = NULL;
 	kz_primary_zone = NULL;
 }
@@ -528,6 +573,7 @@ void kz_amqp_destroy() {
 		}
 		shm_free(kz_bindings);
 	}
+	kz_hash_destroy();
 }
 
 #define KZ_URL_MAX_SIZE 100
@@ -916,7 +962,7 @@ int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload)
 	if(lock_init(&cmd->lock)==NULL)
 	{
 		LM_ERR("cannot init the lock for publishing in process %d\n", getpid());
-		lock_dealloc(&cmd->lock);
+//		lock_dealloc(&cmd->lock);
 		goto error;
 	}
 	lock_get(&cmd->lock);
@@ -983,6 +1029,7 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_
 	cmd->routing_key = kz_amqp_str_dup(str_routing_key);
 	cmd->reply_routing_key = kz_amqp_string_dup(serverid);
 	cmd->payload = kz_amqp_string_dup(payload);
+	cmd->message_id = kz_str_dup(&unique_string);
 
 	cmd->timeout = *kz_timeout;
 
@@ -1475,76 +1522,82 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int idx )
 
 int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind, int idx, kz_amqp_channel_ptr chan)
 {
-	int ret = -1;
-	amqp_bytes_t federated_exchange = {0, 0};
-	amqp_bytes_t federated_routing_key = {0, 0};
+    int ret = -1;
+    amqp_bytes_t federated_exchange = {0, 0};
+    amqp_bytes_t federated_routing_key = {0, 0};
 	char _federated[100];
 
-	if(bind->federate == 0
-		|| dbk_use_federated_exchange == 0
-		|| kz_conn->server->zone == kz_amqp_get_primary_zone()) {
-			amqp_exchange_declare(kz_conn->conn, chan[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
-			if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn))) {
-				ret = -RET_AMQP_ERROR;
-				goto error;
-			}
+    if(bind->federate == 0
+    		|| dbk_use_federated_exchange == 0
+    		|| kz_conn->server->zone == kz_amqp_get_primary_zone()) {
+		amqp_exchange_declare(kz_conn->conn, chan[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
+		if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
+		{
+			ret = -RET_AMQP_ERROR;
+			goto error;
+		}
     }
 
-	if(bind->federate == 1
-		&& dbk_use_federated_exchange == 1
-		&& kz_conn->server->zone != kz_amqp_get_primary_zone()) {
-			federated_exchange = kz_local_amqp_bytes_dup_from_string(dbk_federated_exchange.s);
-			amqp_exchange_declare(kz_conn->conn, chan[idx].channel, federated_exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
-			if (kz_amqp_error("Declaring federated exchange", amqp_get_rpc_reply(kz_conn->conn))) {
-				ret = -RET_AMQP_ERROR;
-				goto error;
-			}
+    if(bind->federate == 1
+    		&& dbk_use_federated_exchange == 1
+			&& kz_conn->server->zone != kz_amqp_get_primary_zone()) {
+    	federated_exchange = kz_local_amqp_bytes_dup_from_string(dbk_federated_exchange.s);
+		amqp_exchange_declare(kz_conn->conn, chan[idx].channel, federated_exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
+		if (kz_amqp_error("Declaring federated exchange", amqp_get_rpc_reply(kz_conn->conn)))
+		{
+			ret = -RET_AMQP_ERROR;
+			goto error;
+		}
     }
 
 	amqp_queue_declare(kz_conn->conn, chan[idx].channel, bind->queue, bind->passive, bind->durable, bind->exclusive, bind->auto_delete, kz_amqp_empty_table);
-	if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn))) {
+	if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
+	{
 		ret = -RET_AMQP_ERROR;
 		goto error;
 	}
 
     if(bind->federate == 0
-		|| dbk_use_federated_exchange == 0
-		|| kz_conn->server->zone == kz_amqp_get_primary_zone()) {
-			if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, bind->exchange, bind->routing_key, kz_amqp_empty_table) < 0
-				|| kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn))) {
-				ret = -RET_AMQP_ERROR;
-				goto error;
-			}
+    		|| dbk_use_federated_exchange == 0
+    		|| kz_conn->server->zone == kz_amqp_get_primary_zone()) {
+		if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, bind->exchange, bind->routing_key, kz_amqp_empty_table) < 0
+			|| kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn)))
+		{
+			ret = -RET_AMQP_ERROR;
+			goto error;
+		}
     }
 
-	if(bind->federate == 1
-		&& dbk_use_federated_exchange == 1
-		&& kz_conn->server->zone != kz_amqp_get_primary_zone()) {
+    if(bind->federate == 1
+    		&& dbk_use_federated_exchange == 1
+			&& kz_conn->server->zone != kz_amqp_get_primary_zone()) {
     	sprintf(_federated, "%.*s%s%.*s", (int)bind->exchange.len, (char*)bind->exchange.bytes,
-				(bind->routing_key.len == 0 ? "" : "."),
+    			(bind->routing_key.len == 0 ? "" : "."),
 				(int)bind->routing_key.len, (char*)bind->routing_key.bytes
-				);
-		federated_routing_key = kz_local_amqp_bytes_dup_from_string(_federated);
+    			);
+    	federated_routing_key = kz_local_amqp_bytes_dup_from_string(_federated);
 		if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, federated_exchange, federated_routing_key, kz_amqp_empty_table) < 0
-			|| kz_amqp_error("Binding queue to federated exchange", amqp_get_rpc_reply(kz_conn->conn))) {
-				ret = -RET_AMQP_ERROR;
-				goto error;
-		}
-	}
-
-	if (amqp_basic_consume(kz_conn->conn, chan[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, bind->no_ack, 0, kz_amqp_empty_table) < 0
-		|| kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn))) {
+			|| kz_amqp_error("Binding queue to federated exchange", amqp_get_rpc_reply(kz_conn->conn)))
+		{
 			ret = -RET_AMQP_ERROR;
 			goto error;
-	}
+		}
+    }
+
+    if (amqp_basic_consume(kz_conn->conn, chan[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, bind->no_ack, 0, kz_amqp_empty_table) < 0
+	    || kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn)))
+    {
+		ret = -RET_AMQP_ERROR;
+		goto error;
+    }
 
-	chan[idx].state = KZ_AMQP_CHANNEL_CONSUMING;
+    chan[idx].state = KZ_AMQP_CHANNEL_CONSUMING;
 	chan[idx].consumer = bind;
-	ret = idx;
-error:
-	kz_local_amqp_bytes_free(federated_exchange);
-	kz_local_amqp_bytes_free(federated_routing_key);
-	return ret;
+    ret = idx;
+ error:
+ 	 kz_local_amqp_bytes_free(federated_exchange);
+ 	 kz_local_amqp_bytes_free(federated_routing_key);
+     return ret;
 }
 
 int kz_amqp_send_ex(kz_amqp_server_ptr srv, kz_amqp_cmd_ptr cmd, kz_amqp_channel_state state, int idx)
@@ -1767,6 +1820,78 @@ void kz_amqp_fire_connection_event(char *event, char* host)
 	kz_amqp_send_consumer_event(payload, 1);
 }
 
+void kz_amqp_cmd_timeout_cb(int fd, short event, void *arg)
+{
+	kz_amqp_cmd_ptr cmd = (kz_amqp_cmd_ptr) arg;
+	kz_amqp_cmd_ptr retrieved_cmd = kz_cmd_retrieve(cmd->message_id);
+	if(retrieved_cmd == NULL) {
+		LM_ERR("timer what!? ");
+	} else {
+		LM_ERR("amqp message timeout for exchange '%s' with routing key '%s' and message id '%.*s'\n"
+				, cmd->exchange, cmd->routing_key
+				, cmd->message_id->len, cmd->message_id->s
+				);
+		close(retrieved_cmd->timerfd);
+		event_del(retrieved_cmd->timer_ev);
+		pkg_free(retrieved_cmd->timer_ev);
+		retrieved_cmd->return_code = -1;
+		lock_release(&retrieved_cmd->lock);
+	}
+}
+
+
+int kz_amqp_start_cmd_timer(kz_amqp_cmd_ptr cmd)
+{
+	int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
+
+	if (timerfd == -1) {
+		LM_ERR("Could not create timerfd.");
+		return 0;
+	}
+
+	cmd->timerfd = timerfd;
+	struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec));
+	if(itime == NULL){
+		LM_ERR("Could not set timer.");
+		return 0;
+	}
+	itime->it_interval.tv_sec = 0;
+	itime->it_interval.tv_nsec = 0;
+
+	itime->it_value.tv_sec = cmd->timeout.tv_sec;
+	itime->it_value.tv_nsec = cmd->timeout.tv_usec * 1000;
+	if (timerfd_settime(timerfd, 0, itime, NULL) == -1) {
+		LM_ERR("Could not set timer.");
+		return 0;
+	}
+	pkg_free(itime);
+	struct event *timer_ev = pkg_malloc(sizeof(struct event));
+	if(timer_ev == NULL) {
+		LM_ERR("Could not allocate timer_ev.");
+		return 0;
+	}
+	event_set(timer_ev, timerfd, EV_READ, kz_amqp_cmd_timeout_cb, cmd);
+	if(event_add(timer_ev, NULL) == -1) {
+		LM_ERR("event_add failed while setting request timer (%s).", strerror(errno));
+		pkg_free(timer_ev);
+		return 0;
+	}
+	cmd->timer_ev = timer_ev;
+	return 1;
+}
+
+void kz_amqp_stop_cmd_timer(kz_amqp_cmd_ptr cmd)
+{
+	if (cmd->timer_ev) {
+			close(cmd->timerfd);
+			event_del(cmd->timer_ev);
+			pkg_free(cmd->timer_ev);
+		} else {
+			LM_ERR("No timer for message id %.*s\n", cmd->message_id->len, cmd->message_id->s);
+		}
+}
+
+
 /* check timeouts */
 int kz_amqp_timeout_proc()
 {
@@ -1807,7 +1932,9 @@ int kz_amqp_connect(kz_amqp_conn_ptr rmq)
 {
 	int i,channel_res;
 	kz_amqp_cmd_ptr cmd;
-	kz_amqp_connection_close(rmq);
+	if(rmq->state != KZ_AMQP_CONNECTION_CLOSED) {
+		kz_amqp_connection_close(rmq);
+	}
 	rmq->state = KZ_AMQP_CONNECTION_CLOSED;
 	rmq->channel_count = rmq->channel_counter = 0;
     if (!(rmq->conn = amqp_new_connection())) {
@@ -1925,32 +2052,9 @@ int kz_amqp_handle_server_failure(kz_amqp_conn_ptr connection)
 	return 0;
 }
 
-
-void kz_amqp_publisher_connect()
+int kz_amqp_publisher_send(kz_amqp_cmd_ptr cmd)
 {
-	kz_amqp_zone_ptr g;
-	kz_amqp_server_ptr s;
-	for (g = kz_amqp_get_zones(); g != NULL; g = g->next) {
-		for (s = g->servers->head; s != NULL; s = s->next) {
-			if(s->producer == NULL) {
-				s->producer = (kz_amqp_conn_ptr) shm_malloc(sizeof(kz_amqp_conn));
-				memset(s->producer, 0, sizeof(kz_amqp_conn));
-				s->producer->server = s;
-			}
-			kz_amqp_connect(s->producer);
-		}
-	}
-}
-
-void kz_amqp_publisher_proc_cb(int fd, short event, void *arg)
-{
-	kz_amqp_cmd_ptr cmd;
     int idx;
-	if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
-		LM_ERR("failed to read from command pipe: %s\n", strerror(errno));
-		return;
-	}
-
 	int sent = 0;
 	kz_amqp_zone_ptr g;
 	kz_amqp_server_ptr s;
@@ -1958,39 +2062,123 @@ void kz_amqp_publisher_proc_cb(int fd, short event, void *arg)
 		for (s = g->servers->head; s != NULL && sent == 0; s = s->next) {
 			if(cmd->server_id == s->id || cmd->server_id == 0) {
 				if(s->producer->state == KZ_AMQP_CONNECTION_OPEN) {
-					if(cmd->type == KZ_AMQP_CMD_PUBLISH) {
+					if(cmd->type == KZ_AMQP_CMD_PUBLISH || cmd->type == KZ_AMQP_CMD_PUBLISH_BROADCAST) {
 						idx = kz_amqp_send(s, cmd);
 						if(idx >= 0) {
 							cmd->return_code = AMQP_RESPONSE_NORMAL;
+							s->channels[idx].state = KZ_AMQP_CHANNEL_FREE;
 							sent = 1;
 						} else {
 							cmd->return_code = -1;
-							LM_ERR("ERROR SENDING PUBLISH");
+							s->channels[idx].state = KZ_AMQP_CHANNEL_CLOSED;
+							LM_ERR("error sending publish to zone : %s , connection id : %d, uri : %s", s->zone->zone, s->id, s->connection->url);
 							kz_amqp_handle_server_failure(s->producer);
 						}
 						s->channels[idx].cmd = NULL;
-						s->channels[idx].state = KZ_AMQP_CHANNEL_FREE;
 					} else if(cmd->type == KZ_AMQP_CMD_CALL) {
 						idx = kz_amqp_send_receive(s, cmd);
 						if(idx < 0) {
 							s->channels[idx].cmd = NULL;
 							cmd->return_code = -1;
-							s->channels[idx].state = KZ_AMQP_CHANNEL_FREE;
-							LM_ERR("ERROR SENDING QUERY");
+							s->channels[idx].state = KZ_AMQP_CHANNEL_CLOSED;
+							LM_ERR("error sending query to zone : %s , connection id : %d, uri : %s", s->zone->zone, s->id, s->connection->url);
 							kz_amqp_handle_server_failure(s->producer);
 						} else {
+							s->channels[idx].state = KZ_AMQP_CHANNEL_FREE;
 							sent = 1;
 						}
 					}
 				}
 			}
 		}
+		if(cmd->type == KZ_AMQP_CMD_PUBLISH_BROADCAST) {
+			sent = 0;
+		}
 	}
-	if(sent == 0) {
-		LM_ERR("ERROR SENDING QUERY");
+	return sent;
+}
+
+
+void kz_amqp_publisher_connect()
+{
+	kz_amqp_zone_ptr g;
+	kz_amqp_server_ptr s;
+	for (g = kz_amqp_get_zones(); g != NULL; g = g->next) {
+		for (s = g->servers->head; s != NULL; s = s->next) {
+			if(s->producer == NULL) {
+				s->producer = (kz_amqp_conn_ptr) shm_malloc(sizeof(kz_amqp_conn));
+				memset(s->producer, 0, sizeof(kz_amqp_conn));
+				s->producer->server = s;
+			}
+			kz_amqp_connect(s->producer);
+		}
+	}
+}
+
+void kz_amqp_publisher_proc_cb(int fd, short event, void *arg)
+{
+	kz_amqp_cmd_ptr cmd;
+	kz_amqp_cmd_ptr retrieved_cmd;
+	if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
+		LM_ERR("failed to read from command pipe: %s\n", strerror(errno));
+		return;
 	}
-	if(sent == 0 || (sent == 1 && cmd->type == KZ_AMQP_CMD_PUBLISH))
+
+	switch(cmd->type) {
+	case KZ_AMQP_CMD_PUBLISH:
+		kz_amqp_publisher_send(cmd);
+		lock_release(&cmd->lock);
+		break;
+
+	case KZ_AMQP_CMD_CALL:
+		if(kz_amqp_publisher_send(cmd) < 0) {
+			lock_release(&cmd->lock);
+		} else {
+			if(!kz_cmd_store(cmd)) {
+				cmd->return_code = -1;
+				lock_release(&cmd->lock);
+			} else {
+				if(!kz_amqp_start_cmd_timer(cmd)) {
+					cmd->return_code = -1;
+					lock_release(&cmd->lock);
+				}
+			}
+		}
+		break;
+
+	case KZ_AMQP_CMD_TARGETED_CONSUMER:
+		retrieved_cmd = kz_cmd_retrieve(cmd->message_id);
+		if(retrieved_cmd == NULL) {
+			LM_ERR("amqp message id %.*s not found.\n", cmd->message_id->len, cmd->message_id->s);
+			kz_amqp_free_pipe_cmd(cmd);
+		} else {
+			kz_amqp_stop_cmd_timer(retrieved_cmd);
+			retrieved_cmd->return_code = cmd->return_code;
+			retrieved_cmd->return_payload = cmd->return_payload;
+			cmd->return_payload = NULL;
+			lock_release(&retrieved_cmd->lock);
+			kz_amqp_free_pipe_cmd(cmd);
+		}
+		break;
+
+	case KZ_AMQP_CMD_PUBLISH_BROADCAST:
+		kz_amqp_publisher_send(cmd);
 		lock_release(&cmd->lock);
+		break;
+
+	case KZ_AMQP_CMD_ASYNC_CALL:
+		break;
+
+	case KZ_AMQP_CMD_COLLECT:
+		break;
+
+	case KZ_AMQP_CMD_ASYNC_COLLECT:
+		break;
+
+	default:
+		break;
+
+	}
 }
 
 int kz_amqp_publisher_proc(int cmd_pipe)
@@ -2029,6 +2217,73 @@ char* maybe_add_consumer_key(int server_id, amqp_bytes_t body)
     return payload;
 }
 
+void kz_send_targeted_cmd(int server_id, amqp_bytes_t body)
+{
+    char buffer[100];
+    char* server_id_str = NULL;
+    kz_amqp_cmd_ptr cmd = NULL;
+    json_object* JObj = NULL;
+	char* payload = kz_local_amqp_bytes_dup(body);
+
+	if(payload == NULL) {
+		LM_ERR("error allocating message payload\n");
+		goto error;
+	}
+
+	json_obj_ptr json_obj = kz_json_parse(payload );
+    if (json_obj == NULL) {
+		LM_ERR("error parsing json payload\n");
+		goto error;
+    }
+
+	cmd = (kz_amqp_cmd_ptr)shm_malloc(sizeof(kz_amqp_cmd));
+	if(cmd == NULL) {
+		LM_ERR("failed to allocate kz_amqp_cmd in process %d\n", getpid());
+		goto error;
+	}
+	memset(cmd, 0, sizeof(kz_amqp_cmd));
+	if(lock_init(&cmd->lock)==NULL)
+	{
+		LM_ERR("cannot init the lock for targeted delivery in process %d\n", getpid());
+		goto error;
+	}
+
+	cmd->type = KZ_AMQP_CMD_TARGETED_CONSUMER;
+	cmd->return_code = AMQP_RESPONSE_NORMAL;
+
+    JObj = kz_json_get_object(json_obj, BLF_JSON_SERVERID);
+    if(JObj != NULL) {
+    	server_id_str = (char*) json_object_get_string(JObj);
+        sprintf(buffer, "consumer://%d/%s", server_id, server_id_str);
+        json_object_object_del(json_obj, BLF_JSON_SERVERID);
+        json_object_object_add(json_obj, BLF_JSON_SERVERID, json_object_new_string(buffer));
+    }
+
+    cmd->return_payload = kz_amqp_string_dup((char*)json_object_to_json_string(json_obj));
+
+    JObj = kz_json_get_object(json_obj, BLF_JSON_MSG_ID);
+    if(JObj != NULL) {
+    	cmd->message_id = kz_str_dup_from_char((char*)json_object_get_string(JObj));
+    }
+
+	if (write(kz_cmd_pipe, &cmd, sizeof(cmd)) != sizeof(cmd)) {
+		LM_ERR("failed to publish message to amqp in process %d, write to command pipe: %s\n", getpid(), strerror(errno));
+	} else {
+		cmd = NULL;
+	}
+
+error:
+	if(json_obj)
+		json_object_put(json_obj);
+
+    if(payload)
+    	pkg_free(payload);
+
+    if(cmd)
+    	kz_amqp_free_pipe_cmd(cmd);
+
+}
+
 
 int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 {
@@ -2068,7 +2323,7 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 
     	for(i=0,channel_res=0; i < dbk_channels && channel_res == 0; i++) {
 			/* start cleanup */
-    		server_ptr->channels[i].consumer = NULL;
+//    		server_ptr->channels[i].consumer = NULL;
 			/* end cleanup */
 
 			/* bind targeted channels */
@@ -2133,6 +2388,9 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 			case AMQP_RESPONSE_NORMAL:
 				idx = envelope.channel-1;
 				if(idx < dbk_channels) {
+					kz_send_targeted_cmd(server_ptr->id, envelope.message.body);
+
+					/*
 					switch(server_ptr->channels[idx].state) {
 						case KZ_AMQP_CHANNEL_CALLING:
 							lock_get(&server_ptr->channels[idx].lock);
@@ -2149,6 +2407,10 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 							LM_INFO("ignoring received payload on consumer - %.*s\n", (int) envelope.message.body.len, (char*)envelope.message.body.bytes);
 							break;
 					}
+					*/
+
+
+
 				} else {
 					idx = idx - dbk_channels;
 					kz_amqp_send_consumer_event_ex(maybe_add_consumer_key(server_ptr->id, envelope.message.body),

+ 27 - 2
modules/kazoo/kz_amqp.h

@@ -30,7 +30,12 @@ typedef enum {
 	KZ_AMQP_CMD_PUBLISH     = 1,
 	KZ_AMQP_CMD_CALL    = 2,
 	KZ_AMQP_CMD_CONSUME = 3,
-	KZ_AMQP_CMD_ACK = 4
+	KZ_AMQP_CMD_ACK = 4,
+	KZ_AMQP_CMD_TARGETED_CONSUMER = 5,
+	KZ_AMQP_CMD_PUBLISH_BROADCAST = 6,
+	KZ_AMQP_CMD_COLLECT = 7,
+	KZ_AMQP_CMD_ASYNC_CALL    = 8,
+	KZ_AMQP_CMD_ASYNC_COLLECT    = 9
 } kz_amqp_pipe_cmd_type;
 
 typedef enum {
@@ -83,10 +88,11 @@ typedef struct {
 } kz_amqp_conn_pool, *kz_amqp_conn_pool_ptr;
 
 
+/*
 #define AMQP_KZ_CMD_PUBLISH       1
 #define AMQP_KZ_CMD_CALL          2
 #define AMQP_KZ_CMD_CONSUME       3
-
+*/
 
 typedef struct {
     gen_lock_t lock;
@@ -98,14 +104,31 @@ typedef struct {
 	char* queue;
 	char* payload;
 	char* return_payload;
+	str* message_id;
 	int   return_code;
 	int   consumer;
 	int   server_id;
 	uint64_t delivery_tag;
 	amqp_channel_t channel;
 	struct timeval timeout;
+
+	/* timer */
+	struct event *timer_ev;
+	int timerfd;
+
 } kz_amqp_cmd, *kz_amqp_cmd_ptr;
 
+typedef struct kz_amqp_cmd_entry_t {
+	kz_amqp_cmd_ptr cmd;
+	struct kz_amqp_cmd_entry_t* next;
+} kz_amqp_cmd_entry, *kz_amqp_cmd_entry_ptr;
+
+typedef struct kz_amqp_cmd_table_t {
+	kz_amqp_cmd_entry_ptr entries;
+	gen_lock_t lock;
+} kz_amqp_cmd_table, *kz_amqp_cmd_table_ptr;
+
+
 typedef struct {
 	char* payload;
 	uint64_t delivery_tag;
@@ -216,6 +239,8 @@ kz_amqp_zone_ptr kz_amqp_add_zone(char* zone);
 
 void kz_amqp_fire_connection_event(char *event, char* host);
 
+void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd);
+
 static inline int kz_amqp_error(char const *context, amqp_rpc_reply_t x)
 {
 	amqp_connection_close_t *mconn;

+ 182 - 0
modules/kazoo/kz_hash.c

@@ -0,0 +1,182 @@
+#include "kz_hash.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include "../../mem/shm_mem.h"
+#include "../../hashes.h"
+#include "../../dprint.h"
+#include "../../str.h"
+
+extern int dbk_command_table_size;
+
+kz_amqp_cmd_table_ptr kz_cmd_htable = NULL;
+
+int kz_hash_init()
+{
+	int i, j;
+
+	if(kz_cmd_htable)
+	{
+		LM_ERR("already initialized");
+		return 1;
+	}
+	
+	i = 0;
+	kz_cmd_htable = (kz_amqp_cmd_table_ptr)shm_malloc(dbk_command_table_size* sizeof(kz_amqp_cmd_table));
+	if(kz_cmd_htable == NULL)
+	{
+		LM_ERR("memory error allocating command table");
+		return 0;
+	}
+	memset(kz_cmd_htable, 0, dbk_command_table_size* sizeof(kz_amqp_cmd_table));
+
+	for(i= 0; i< dbk_command_table_size; i++)
+	{
+		if(lock_init(&kz_cmd_htable[i].lock)== 0)
+		{
+			LM_ERR("initializing lock [%d]\n", i);
+			goto error;
+		}
+		kz_cmd_htable[i].entries= (kz_amqp_cmd_entry_ptr)shm_malloc(sizeof(kz_amqp_cmd_entry));
+		if(kz_cmd_htable[i].entries== NULL)
+		{
+			LM_ERR("memory error allocating command entry");
+			return 0;
+		}
+		memset(kz_cmd_htable[i].entries, 0, sizeof(kz_amqp_cmd_entry));
+		kz_cmd_htable[i].entries->next= NULL;
+	}
+
+	return 1;
+
+error:
+	if(kz_cmd_htable)
+	{
+		for(j=0; j< i; j++)
+		{
+			if(kz_cmd_htable[i].entries)
+				shm_free(kz_cmd_htable[i].entries);
+			else 
+				break;
+			lock_destroy(&kz_cmd_htable[i].lock);
+		}
+		shm_free(kz_cmd_htable);
+	}
+	return 0;
+
+}
+
+void kz_hash_destroy()
+{
+	int i;
+	kz_amqp_cmd_entry_ptr p, prev_p;
+
+	if(kz_cmd_htable== NULL)
+		return;
+
+	for(i= 0; i< dbk_command_table_size; i++)
+	{
+		lock_destroy(&kz_cmd_htable[i].lock);
+		p= kz_cmd_htable[i].entries;
+		while(p)
+		{
+			prev_p= p;
+			p= p->next;
+			kz_amqp_free_pipe_cmd(prev_p->cmd);
+			shm_free(prev_p);
+		}
+	}
+	shm_free(kz_cmd_htable);
+}
+
+kz_amqp_cmd_entry_ptr kz_search_cmd_table(str* message_id, unsigned int hash_code)
+{
+	kz_amqp_cmd_entry_ptr p;
+
+	LM_DBG("searching %.*s\n", message_id->len,  message_id->s);
+	p= kz_cmd_htable[hash_code].entries->next;
+	while(p)
+	{
+		if(p->cmd->message_id->len== message_id->len &&
+				strncmp(p->cmd->message_id->s, message_id->s, p->cmd->message_id->len) == 0 )
+			return p;
+		p= p->next;
+	}
+	return NULL;
+}
+
+int kz_cmd_store(kz_amqp_cmd_ptr cmd)
+{
+	unsigned int hash_code;
+	kz_amqp_cmd_entry_ptr p= NULL;
+
+	hash_code = core_hash(cmd->message_id, NULL, dbk_command_table_size);
+
+	lock_get(&kz_cmd_htable[hash_code].lock);
+	
+	p= kz_search_cmd_table(cmd->message_id, hash_code);
+	if(p)
+	{
+		LM_ERR("command already stored\n");
+		lock_release(&kz_cmd_htable[hash_code].lock);
+		return 0;
+	}
+
+	p = shm_malloc(sizeof(kz_amqp_cmd_entry));
+	if(p== NULL)
+	{
+		lock_release(&kz_cmd_htable[hash_code].lock);
+		LM_ERR("memory error allocation command pointer\n");
+		return 0;
+	}
+	memset(p, 0, sizeof(kz_amqp_cmd_entry));
+
+	p->cmd = cmd;
+	p->next= kz_cmd_htable[hash_code].entries->next;
+	kz_cmd_htable[hash_code].entries->next= p;
+
+	lock_release(&kz_cmd_htable[hash_code].lock);
+	
+	return 1;
+}
+
+kz_amqp_cmd_ptr kz_cmd_retrieve(str* message_id)
+{
+	unsigned int hash_code;
+	kz_amqp_cmd_entry_ptr p= NULL, prev_p= NULL;
+	kz_amqp_cmd_ptr cmd = NULL;
+
+	hash_code= core_hash(message_id, NULL, dbk_command_table_size);
+
+	lock_get(&kz_cmd_htable[hash_code].lock);
+
+	p = kz_search_cmd_table(message_id, hash_code);
+	if(p== NULL)
+	{
+		LM_DBG("command pointer hash entry not found\n");
+		lock_release(&kz_cmd_htable[hash_code].lock);
+		return NULL;
+	}
+
+	prev_p = kz_cmd_htable[hash_code].entries;
+	while(prev_p->next)
+	{
+		if(prev_p->next== p)
+			break;
+		prev_p= prev_p->next;
+	}
+	if(prev_p->next== NULL)
+	{
+		LM_ERR("command pointer not found\n");
+		lock_release(&kz_cmd_htable[hash_code].lock);
+		return NULL;
+	}
+	prev_p->next= p->next;
+	cmd = p->cmd;
+	shm_free(p);
+	lock_release(&kz_cmd_htable[hash_code].lock);
+
+	return cmd;
+}
+
+

+ 15 - 0
modules/kazoo/kz_hash.h

@@ -0,0 +1,15 @@
+#ifndef KZ_HASH_H
+#define KZ_HASH_H
+
+#include "../../lock_ops.h"
+#include "kz_amqp.h"
+
+int kz_hash_init();
+void kz_hash_destroy();
+
+int kz_cmd_store(kz_amqp_cmd_ptr cmd);
+kz_amqp_cmd_ptr kz_cmd_retrieve(str* message_id);
+
+
+#endif
+