浏览代码

kazoo : add async query feature

suspend the transaction on send and continue on return or timeout
Luis Azedo 10 年之前
父节点
当前提交
6aa380df64
共有 6 个文件被更改,包括 467 次插入81 次删除
  1. 12 1
      modules/kazoo/kazoo.c
  2. 420 78
      modules/kazoo/kz_amqp.c
  3. 21 1
      modules/kazoo/kz_amqp.h
  4. 10 0
      modules/kazoo/kz_fixup.c
  5. 3 0
      modules/kazoo/kz_fixup.h
  6. 1 1
      modules/kazoo/kz_json.c

+ 12 - 1
modules/kazoo/kazoo.c

@@ -110,6 +110,9 @@ str kz_db_url = {0,0};
 str kz_query_timeout_avp = {0,0};
 pv_spec_t kz_query_timeout_spec;
 
+str kz_query_result_avp = str_init("$avp(amqp_result)");
+pv_spec_t kz_query_result_spec;
+
 str kz_app_name = str_init(NAME);
 
 MODULE_VERSION
@@ -152,7 +155,10 @@ static cmd_export_t cmds[] = {
     {"kazoo_json", (cmd_function) kz_json_get_field, 3, fixup_kz_json, fixup_kz_json_free, ANY_ROUTE},
     {"kazoo_json_keys", (cmd_function) kz_json_get_keys, 3, fixup_kz_json, fixup_kz_json_free, ANY_ROUTE},
     {"kazoo_encode", (cmd_function) kz_amqp_encode, 2, fixup_kz_amqp_encode, fixup_kz_amqp_encode_free, ANY_ROUTE},
-    {0, 0, 0, 0, 0, 0}
+
+    {"kazoo_async_query", (cmd_function) kz_amqp_async_query, 5, fixup_kz_async_amqp, fixup_kz_async_amqp_free, ANY_ROUTE},
+
+	{0, 0, 0, 0, 0, 0}
 };
 
 static param_export_t params[] = {
@@ -191,6 +197,7 @@ static param_export_t params[] = {
     {"amqp_heartbeats", INT_PARAM, &dbk_use_hearbeats},
     {"amqp_primary_zone", STR_PARAM, &dbk_primary_zone_name.s},
     {"amqp_command_hashtable_size", INT_PARAM, &dbk_command_table_size},
+    {"amqp_result_avp", STR_PARAM, &kz_query_result_avp.s},
     {0, 0, 0}
 };
 
@@ -231,6 +238,10 @@ static int kz_init_avp(void) {
 		memset( &kz_query_timeout_spec, 0, sizeof(pv_spec_t));
 	}
 
+	if ( kz_parse_avp(&kz_query_result_avp, &kz_query_result_spec, "amqp_result_avp") <0) {
+		return -1;
+	}
+
 	return 0;
 }
 

+ 420 - 78
modules/kazoo/kz_amqp.c

@@ -12,6 +12,7 @@
 #include "../../pvar.h"
 #include "../../mod_fix.h"
 #include "../../lvalue.h"
+#include "../tm/tm_load.h"
 
 
 #include "kz_amqp.h"
@@ -20,6 +21,8 @@
 
 #define RET_AMQP_ERROR 2
 
+struct tm_binds tmb;
+
 //kz_amqp_connection_pool_ptr kz_pool = NULL;
 
 kz_amqp_bindings_ptr kz_bindings = NULL;
@@ -29,6 +32,8 @@ static unsigned long rpl_query_routing_key_count = 0;
 
 typedef struct json_object *json_obj_ptr;
 
+extern pv_spec_t kz_query_result_spec;
+
 //kz_amqp_channel_ptr channels = NULL;
 extern int *kz_worker_pipes;
 extern int kz_cmd_pipe;
@@ -134,6 +139,23 @@ static char *kz_local_amqp_str_dup(str *src)
 	return res;
 }
 
+
+static inline str* kz_local_str_dup(str* src)
+{
+	char *dst_char = (char*)pkg_malloc(sizeof(str)+src->len+1);
+	if (!dst_char) {
+		LM_ERR("error allocating shared memory for str");
+		return NULL;
+	}
+	str* dst = (str*)dst_char;
+	dst->s = dst_char+sizeof(str);
+
+	memcpy(dst->s, src->s, src->len);
+	dst->len = src->len;
+	dst->s[dst->len] = '\0';
+	return dst;
+}
+
 /*
 static char *kz_local_amqp_string_dup(char *src)
 {
@@ -232,6 +254,34 @@ amqp_bytes_t kz_amqp_bytes_dup_from_str(str *src)
 	return kz_amqp_bytes_malloc_dup(amqp_cstring_bytes(src->s));
 }
 
+void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd)
+{
+	if(cmd == NULL)
+		return;
+	if (cmd->exchange)
+		shm_free(cmd->exchange);
+	if (cmd->exchange_type)
+		shm_free(cmd->exchange_type);
+	if (cmd->queue)
+		shm_free(cmd->queue);
+	if (cmd->routing_key)
+		shm_free(cmd->routing_key);
+	if (cmd->reply_routing_key)
+		shm_free(cmd->reply_routing_key);
+	if (cmd->payload)
+		shm_free(cmd->payload);
+	if (cmd->return_payload)
+		shm_free(cmd->return_payload);
+	if (cmd->message_id)
+		shm_free(cmd->message_id);
+	if (cmd->cb_route)
+		shm_free(cmd->cb_route);
+	if (cmd->err_route)
+		shm_free(cmd->err_route);
+	lock_release(&cmd->lock);
+	lock_destroy(&cmd->lock);
+	shm_free(cmd);
+}
 
 void kz_amqp_free_consumer_delivery(kz_amqp_consumer_delivery_ptr ptr)
 {
@@ -243,6 +293,10 @@ void kz_amqp_free_consumer_delivery(kz_amqp_consumer_delivery_ptr ptr)
 		shm_free(ptr->event_key);
 	if(ptr->event_subkey)
 		shm_free(ptr->event_subkey);
+	if(ptr->message_id)
+		shm_free(ptr->message_id);
+	if(ptr->cmd)
+		kz_amqp_free_pipe_cmd(ptr->cmd);
 	shm_free(ptr);
 }
 
@@ -275,32 +329,6 @@ void kz_amqp_free_connection(kz_amqp_connection_ptr conn)
 	shm_free(conn);
 }
 
-
-void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd)
-{
-	if(cmd == NULL)
-		return;
-	if (cmd->exchange)
-		shm_free(cmd->exchange);
-	if (cmd->exchange_type)
-		shm_free(cmd->exchange_type);
-	if (cmd->queue)
-		shm_free(cmd->queue);
-	if (cmd->routing_key)
-		shm_free(cmd->routing_key);
-	if (cmd->reply_routing_key)
-		shm_free(cmd->reply_routing_key);
-	if (cmd->payload)
-		shm_free(cmd->payload);
-	if (cmd->return_payload)
-		shm_free(cmd->return_payload);
-	if (cmd->message_id)
-		shm_free(cmd->message_id);
-	lock_release(&cmd->lock);
-	lock_destroy(&cmd->lock);
-	shm_free(cmd);
-}
-
 kz_amqp_cmd_ptr kz_amqp_alloc_pipe_cmd()
 {
 	kz_amqp_cmd_ptr cmd = (kz_amqp_cmd_ptr)shm_malloc(sizeof(kz_amqp_cmd));
@@ -466,6 +494,21 @@ int kz_amqp_bind_init_targeted_channel(kz_amqp_server_ptr server, int idx )
     return ret;
 }
 
+int kz_tm_bind()
+{
+	load_tm_f  load_tm;
+
+	if ( !(load_tm=(load_tm_f)find_export("load_tm", NO_SCRIPT, 0)))
+	{
+		LOG(L_ERR, "cannot import load_tm\n");
+		return 0;
+	}
+	if (load_tm( &tmb )==-1)
+		return 0;
+	return 1;
+}
+
+
 int kz_amqp_init() {
 	int i;
 	kz_amqp_zone_ptr g;
@@ -474,6 +517,9 @@ int kz_amqp_init() {
 	if(!kz_hash_init())
 		return 0;
 
+	if(!kz_tm_bind())
+		return 0;
+
 	if(kz_bindings == NULL) {
 		kz_bindings = (kz_amqp_bindings_ptr) shm_malloc(sizeof(kz_amqp_bindings));
 		memset(kz_bindings, 0, sizeof(kz_amqp_bindings));
@@ -1118,6 +1164,162 @@ int kz_pv_get_last_query_result(struct sip_msg *msg, pv_param_t *param,	pv_value
 	return last_payload_result == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, last_payload_result);
 }
 
+int kz_amqp_async_query(struct sip_msg* msg, char* _exchange, char* _routing_key, char* _payload, char* _cb_route, char* _err_route)
+{
+	  str json_s;
+	  str exchange_s;
+	  str routing_key_s;
+	  str cb_route_s;
+	  str err_route_s;
+	  struct timeval kz_timeout = kz_qtimeout_tv;
+      int ret = -1;
+      json_obj_ptr json_obj = NULL;
+	  kz_amqp_cmd_ptr cmd = NULL;
+//	  json_obj_ptr json_body = NULL;
+	  unsigned int hash_index = 0;
+	  unsigned int label = 0;
+	  tm_cell_t *t = 0;
+
+	  str unique_string = { 0, 0 };
+	  char serverid[512];
+
+	  uuid_t id;
+	  char uuid_buffer[40];
+
+	  if (fixup_get_svalue(msg, (gparam_p)_exchange, &exchange_s) != 0) {
+		  LM_ERR("cannot get exchange string value\n");
+		  goto error;
+	  }
+
+	  if (fixup_get_svalue(msg, (gparam_p)_routing_key, &routing_key_s) != 0) {
+		  LM_ERR("cannot get routing_key string value\n");
+		  goto error;
+	  }
+
+	  if (fixup_get_svalue(msg, (gparam_p)_payload, &json_s) != 0) {
+		  LM_ERR("cannot get json string value : %s\n", _payload);
+		  goto error;
+	  }
+
+	  json_obj = json_tokener_parse(json_s.s);
+
+	  if (is_error(json_obj)) {
+		  LM_ERR("empty or invalid JSON payload : %*.s\n", json_s.len, json_s.s);
+		  goto error;
+	  }
+
+	  if (fixup_get_svalue(msg, (gparam_p)_cb_route, &cb_route_s) != 0) {
+		  LM_ERR("cannot get cb_route value\n");
+		  return -1;
+	  }
+
+	  if (fixup_get_svalue(msg, (gparam_p)_err_route, &err_route_s) != 0) {
+		  LM_ERR("cannot get err_route value\n");
+		  return -1;
+	  }
+
+	  if(kz_query_timeout_spec.type != PVT_NONE) {
+		  pv_value_t pv_val;
+		  if(pv_get_spec_value( msg, &kz_query_timeout_spec, &pv_val) == 0) {
+			  if((pv_val.flags & PV_VAL_INT) && pv_val.ri != 0 ) {
+				  kz_timeout.tv_usec = (pv_val.ri % 1000) * 1000;
+				  kz_timeout.tv_sec = pv_val.ri / 1000;
+				  LM_DBG("setting timeout to %i,%i\n", (int) kz_timeout.tv_sec, (int) kz_timeout.tv_usec);
+			  }
+		  }
+	  }
+
+	  t = tmb.t_gett();
+	  if (t==NULL || t==T_UNDEFINED) {
+		  if(tmb.t_newtran(msg)<0) {
+			  LM_ERR("cannot create the transaction\n");
+			  goto error;
+		  }
+		  t = tmb.t_gett();
+		  if (t==NULL || t==T_UNDEFINED) {
+			  LM_ERR("cannot look up the transaction\n");
+			  goto error;
+		  }
+	  }
+
+	  if (tmb.t_suspend(msg, &hash_index, &label) < 0) {
+		  LM_ERR("t_suspend() failed\n");
+		  goto error;
+	  }
+
+	  uuid_generate_random(id);
+	  uuid_unparse_lower(id, uuid_buffer);
+	  unique_string.s = uuid_buffer;
+	  unique_string.len = strlen(unique_string.s);
+	  sprintf(serverid, "kamailio@%.*s-<%d>-script-%lu", dbk_node_hostname.len, dbk_node_hostname.s, my_pid(), rpl_query_routing_key_count++);
+	  kz_amqp_add_payload_common_properties(json_obj, serverid, &unique_string);
+
+		cmd = (kz_amqp_cmd_ptr)shm_malloc(sizeof(kz_amqp_cmd));
+		if(cmd == NULL) {
+			LM_ERR("failed to allocate kz_amqp_cmd in process %d\n", getpid());
+			goto error;
+		}
+		memset(cmd, 0, sizeof(kz_amqp_cmd));
+		cmd->exchange = kz_amqp_str_dup(&exchange_s);
+		cmd->routing_key = kz_amqp_str_dup(&routing_key_s);
+		cmd->reply_routing_key = kz_amqp_string_dup(serverid);
+		cmd->payload = kz_amqp_string_dup((char *)json_object_to_json_string(json_obj));
+		cmd->message_id = kz_str_dup(&unique_string);
+		cmd->timeout = kz_timeout;
+		cmd->cb_route = kz_amqp_str_dup(&cb_route_s);
+		cmd->err_route = kz_amqp_str_dup(&err_route_s);
+		cmd->t_hash = hash_index;
+		cmd->t_label = label;
+
+		if(cmd->payload == NULL || cmd->routing_key == NULL || cmd->exchange == NULL) {
+			LM_ERR("failed to allocate kz_amqp_cmd parameters in process %d\n", getpid());
+			goto error;
+		}
+		if(lock_init(&cmd->lock)==NULL)
+		{
+			LM_ERR("cannot init the lock for publishing in process %d\n", getpid());
+			lock_dealloc(&cmd->lock);
+			goto error;
+		}
+		cmd->type = KZ_AMQP_CMD_ASYNC_CALL;
+		cmd->consumer = getpid();
+		if (write(kz_cmd_pipe, &cmd, sizeof(cmd)) != sizeof(cmd)) {
+			LM_ERR("failed to publish message to amqp in process %d, write to command pipe: %s\n", getpid(), strerror(errno));
+			goto error;
+		}
+		ret = 0;
+		goto exit;
+
+error:
+		if(cmd)
+			kz_amqp_free_pipe_cmd(cmd);
+
+		if(hash_index | label)
+			tmb.t_cancel_suspend(hash_index, label);
+
+exit:
+	    if(json_obj)
+	    	json_object_put(json_obj);
+
+	    return ret;
+};
+
+void kz_amqp_reset_last_result()
+{
+	if(last_payload_result)
+		pkg_free(last_payload_result);
+	last_payload_result = NULL;
+}
+
+void kz_amqp_set_last_result(char* json)
+{
+	kz_amqp_reset_last_result();
+	int len = strlen(json);
+	char* value = pkg_malloc(len+1);
+	memcpy(value, json, len);
+	value[len] = '\0';
+	last_payload_result = value;
+}
 
 int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload)
 {
@@ -1820,40 +2022,70 @@ void kz_amqp_fire_connection_event(char *event, char* host)
 	kz_amqp_send_consumer_event(payload, 1);
 }
 
+void kz_amqp_cb_ok(kz_amqp_cmd_ptr cmd)
+{
+	int n = route_get(&main_rt, cmd->cb_route);
+	struct action *a = main_rt.rlist[n];
+	tmb.t_continue(cmd->t_hash, cmd->t_label, a);
+}
+
+void kz_amqp_cb_error(kz_amqp_cmd_ptr cmd)
+{
+	int n = route_get(&main_rt, cmd->err_route);
+	struct action *a = main_rt.rlist[n];
+	tmb.t_continue(cmd->t_hash, cmd->t_label, a);
+	kz_amqp_free_pipe_cmd(cmd);
+}
+
 void kz_amqp_cmd_timeout_cb(int fd, short event, void *arg)
 {
-	kz_amqp_cmd_ptr cmd = (kz_amqp_cmd_ptr) arg;
+	kz_amqp_cmd_timeout_ptr cmd = (kz_amqp_cmd_timeout_ptr) arg;
 	kz_amqp_cmd_ptr retrieved_cmd = kz_cmd_retrieve(cmd->message_id);
-	if(retrieved_cmd == NULL) {
-		LM_ERR("timer what!? ");
-	} else {
+	if(retrieved_cmd != NULL) {
 		LM_DBG("amqp message timeout for exchange '%s' with routing key '%s' and message id '%.*s'\n"
-				, cmd->exchange, cmd->routing_key
-				, cmd->message_id->len, cmd->message_id->s
+				, retrieved_cmd ->exchange, retrieved_cmd ->routing_key
+				, retrieved_cmd ->message_id->len, retrieved_cmd ->message_id->s
 				);
-		close(retrieved_cmd->timerfd);
-		event_del(retrieved_cmd->timer_ev);
-		pkg_free(retrieved_cmd->timer_ev);
-		retrieved_cmd->return_code = -1;
-		lock_release(&retrieved_cmd->lock);
+		if(retrieved_cmd->type == KZ_AMQP_CMD_ASYNC_CALL) {
+			kz_amqp_cb_error(retrieved_cmd);
+		} else {
+			retrieved_cmd->return_code = -1;
+			lock_release(&retrieved_cmd->lock);
+		}
 	}
+	close(cmd->timerfd);
+	event_del(cmd->timer_ev);
+	pkg_free(cmd->timer_ev);
+	pkg_free(cmd);
+
 }
 
 
 int kz_amqp_start_cmd_timer(kz_amqp_cmd_ptr cmd)
 {
-	int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
+	kz_amqp_cmd_timeout_ptr timeout_cmd = pkg_malloc(sizeof(kz_amqp_cmd_timeout));
+	if(timeout_cmd == NULL) {
+		LM_ERR("Could not allocate memory for kz_amqp_cmd_timeout_ptr\n");
+		goto error;
+	}
+
+	timeout_cmd->message_id = kz_local_str_dup(cmd->message_id);
+	if(timeout_cmd->message_id == NULL) {
+		LM_ERR("Could not allocate memory for kz_amqp_cmd_timeout_ptr message_id\n");
+		goto error;
+	}
 
+	int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
 	if (timerfd == -1) {
-		LM_ERR("Could not create timerfd.");
-		return 0;
+		LM_ERR("Could not create timerfd\n");
+		goto error;
 	}
 
-	cmd->timerfd = timerfd;
+	timeout_cmd->timerfd = timerfd;
 	struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec));
 	if(itime == NULL){
-		LM_ERR("Could not set timer.");
-		return 0;
+		LM_ERR("Could not set timer\n");
+		goto error;
 	}
 	itime->it_interval.tv_sec = 0;
 	itime->it_interval.tv_nsec = 0;
@@ -1861,25 +2093,34 @@ int kz_amqp_start_cmd_timer(kz_amqp_cmd_ptr cmd)
 	itime->it_value.tv_sec = cmd->timeout.tv_sec;
 	itime->it_value.tv_nsec = cmd->timeout.tv_usec * 1000;
 	if (timerfd_settime(timerfd, 0, itime, NULL) == -1) {
-		LM_ERR("Could not set timer.");
-		return 0;
+		LM_ERR("Could not set timer\n");
+		goto error;
 	}
 	pkg_free(itime);
 	struct event *timer_ev = pkg_malloc(sizeof(struct event));
 	if(timer_ev == NULL) {
-		LM_ERR("Could not allocate timer_ev.");
-		return 0;
+		LM_ERR("Could not allocate timer_ev\n");
+		goto error;
 	}
-	event_set(timer_ev, timerfd, EV_READ, kz_amqp_cmd_timeout_cb, cmd);
+	event_set(timer_ev, timerfd, EV_READ, kz_amqp_cmd_timeout_cb, timeout_cmd);
 	if(event_add(timer_ev, NULL) == -1) {
-		LM_ERR("event_add failed while setting request timer (%s).", strerror(errno));
+		LM_ERR("event_add failed while setting request timer (%s)\n", strerror(errno));
 		pkg_free(timer_ev);
-		return 0;
+		goto error;
 	}
-	cmd->timer_ev = timer_ev;
+	timeout_cmd->timer_ev = timer_ev;
 	return 1;
+
+error:
+	if(timeout_cmd) {
+		if(timeout_cmd->message_id)
+			pkg_free(timeout_cmd->message_id);
+		pkg_free(timeout_cmd);
+	}
+	return 0;
 }
 
+/*
 void kz_amqp_stop_cmd_timer(kz_amqp_cmd_ptr cmd)
 {
 	if (cmd->timer_ev) {
@@ -1890,7 +2131,7 @@ void kz_amqp_stop_cmd_timer(kz_amqp_cmd_ptr cmd)
 			LM_ERR("No timer for message id %.*s\n", cmd->message_id->len, cmd->message_id->s);
 		}
 }
-
+*/
 
 /* check timeouts */
 int kz_amqp_timeout_proc()
@@ -2062,7 +2303,10 @@ int kz_amqp_publisher_send(kz_amqp_cmd_ptr cmd)
 		for (s = g->servers->head; s != NULL && sent == 0; s = s->next) {
 			if(cmd->server_id == s->id || cmd->server_id == 0) {
 				if(s->producer->state == KZ_AMQP_CONNECTION_OPEN) {
-					if(cmd->type == KZ_AMQP_CMD_PUBLISH || cmd->type == KZ_AMQP_CMD_PUBLISH_BROADCAST) {
+					if(cmd->type == KZ_AMQP_CMD_PUBLISH
+							|| cmd->type == KZ_AMQP_CMD_PUBLISH_BROADCAST
+							|| cmd->type == KZ_AMQP_CMD_ASYNC_CALL)
+					{
 						idx = kz_amqp_send(s, cmd);
 						if(idx >= 0) {
 							cmd->return_code = AMQP_RESPONSE_NORMAL;
@@ -2152,7 +2396,7 @@ void kz_amqp_publisher_proc_cb(int fd, short event, void *arg)
 			LM_DBG("amqp message id %.*s not found.\n", cmd->message_id->len, cmd->message_id->s);
 			kz_amqp_free_pipe_cmd(cmd);
 		} else {
-			kz_amqp_stop_cmd_timer(retrieved_cmd);
+//			kz_amqp_stop_cmd_timer(retrieved_cmd);
 			retrieved_cmd->return_code = cmd->return_code;
 			retrieved_cmd->return_payload = cmd->return_payload;
 			cmd->return_payload = NULL;
@@ -2167,6 +2411,17 @@ void kz_amqp_publisher_proc_cb(int fd, short event, void *arg)
 		break;
 
 	case KZ_AMQP_CMD_ASYNC_CALL:
+		if(kz_amqp_publisher_send(cmd) < 0) {
+			kz_amqp_cb_error(cmd);
+		} else {
+			if(!kz_cmd_store(cmd)) {
+				kz_amqp_cb_error(cmd);
+			} else {
+				if(!kz_amqp_start_cmd_timer(cmd)) {
+					kz_amqp_cb_error(cmd);
+				}
+			}
+		}
 		break;
 
 	case KZ_AMQP_CMD_COLLECT:
@@ -2284,6 +2539,93 @@ error:
 
 }
 
+void kz_amqp_send_worker_event(amqp_envelope_t* envelope, kz_amqp_bind_ptr bind)
+{
+    char buffer[100];
+    kz_amqp_cmd_ptr cmd = NULL;
+    kz_amqp_consumer_delivery_ptr ptr = NULL;
+    str* message_id = NULL;
+    int idx = envelope->channel-1;
+    /*
+	char* payload = kz_amqp_bytes_dup(envelope->message.body);
+	if(!payload) {
+		LM_ERR("error alocating memory\n");
+		return;
+	}
+	*/
+
+	json_obj_ptr json_obj = kz_json_parse((char*)envelope->message.body.bytes);
+    if (json_obj == NULL) {
+    	LM_ERR("error parsing json body\n");
+    	return;
+    }
+
+	json_object* JObj = kz_json_get_object(json_obj, BLF_JSON_SERVERID);
+    if(JObj != NULL) {
+        const char* server_id_str = json_object_get_string(JObj);
+        sprintf(buffer, "consumer://%d/%s", server_id, server_id_str);
+        json_object_object_del(json_obj, BLF_JSON_SERVERID);
+        json_object_object_add(json_obj, BLF_JSON_SERVERID, json_object_new_string(buffer));
+    }
+
+    JObj = kz_json_get_object(json_obj, BLF_JSON_MSG_ID);
+    if(JObj != NULL) {
+    	message_id = kz_str_dup_from_char((char*)json_object_get_string(JObj));
+    	if(message_id == NULL) {
+    		LM_ERR("Error allocating memory for message_id copy\n");
+    		goto error;
+    	}
+		if(idx < dbk_channels) {
+			cmd = kz_cmd_retrieve(message_id);
+			/*
+			if(cmd != NULL) {
+				cmd->return_code = 0;
+				cmd->return_payload = kz_amqp_string_dup((char*)json_object_to_json_string(json_obj));
+			}
+			*/
+		}
+    }
+
+	ptr = (kz_amqp_consumer_delivery_ptr) shm_malloc(sizeof(kz_amqp_consumer_delivery));
+	if(ptr == NULL) {
+		LM_ERR("NO MORE SHARED MEMORY!");
+		goto error;
+	}
+	memset(ptr, 0, sizeof(kz_amqp_consumer_delivery));
+	ptr->channel = envelope->channel;
+	ptr->delivery_tag = envelope->delivery_tag;
+	ptr->payload = kz_amqp_string_dup((char*)json_object_to_json_string(json_obj));
+	ptr->cmd = cmd;
+	ptr->message_id = message_id;
+
+	if(bind) {
+		ptr->event_key = kz_amqp_bytes_dup(bind->event_key);
+		ptr->event_subkey = kz_amqp_bytes_dup(bind->event_subkey);
+	}
+
+	consumer++;
+	if(consumer >= dbk_consumer_processes) {
+		consumer = 0;
+	}
+
+	if (write(kz_worker_pipes[consumer], &ptr, sizeof(ptr)) != sizeof(ptr)) {
+		LM_ERR("failed to send payload to consumer %d : %s\nPayload %s\n", consumer, strerror(errno), ptr->payload);
+		goto error;
+	}
+
+	json_object_put(json_obj);
+
+	return;
+
+error:
+	if(ptr)
+		kz_amqp_free_consumer_delivery(ptr);
+
+	if(json_obj)
+		json_object_put(json_obj);
+
+}
+
 
 int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 {
@@ -2388,29 +2730,22 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 			case AMQP_RESPONSE_NORMAL:
 				idx = envelope.channel-1;
 				if(idx < dbk_channels) {
-					kz_send_targeted_cmd(server_ptr->id, envelope.message.body);
-
-					/*
-					switch(server_ptr->channels[idx].state) {
-						case KZ_AMQP_CHANNEL_CALLING:
-							lock_get(&server_ptr->channels[idx].lock);
-							if(server_ptr->channels[idx].cmd != NULL) {
-								server_ptr->channels[idx].cmd->return_payload = kz_amqp_bytes_dup(envelope.message.body);
-								server_ptr->channels[idx].cmd->return_code = AMQP_RESPONSE_NORMAL;
-								lock_release(&server_ptr->channels[idx].cmd->lock);
-								server_ptr->channels[idx].cmd = NULL;
-								server_ptr->channels[idx].state = KZ_AMQP_CHANNEL_FREE;
-							}
-							lock_release(&server_ptr->channels[idx].lock);
-							break;
-						default:
-							LM_INFO("ignoring received payload on consumer - %.*s\n", (int) envelope.message.body.len, (char*)envelope.message.body.bytes);
-							break;
+					kz_amqp_send_worker_event(&envelope, NULL);
+				} else {
+					idx = idx - dbk_channels;
+					if(!server_ptr->consumer_channels[idx].consumer->no_ack ) {
+						if(amqp_basic_ack(server_ptr->consumer->conn, envelope.channel, envelope.delivery_tag, 0 ) < 0) {
+							LM_ERR("AMQP ERROR TRYING TO ACK A MSG\n");
+							OK = 0;
+						}
 					}
-					*/
-
-
-
+					if(OK)
+						kz_amqp_send_worker_event(&envelope, server_ptr->consumer_channels[idx].consumer);
+				}
+				/*
+				idx = envelope.channel-1;
+				if(idx < dbk_channels) {
+					kz_send_targeted_cmd(server_ptr->id, envelope.message.body);
 				} else {
 					idx = idx - dbk_channels;
 					kz_amqp_send_consumer_event_ex(maybe_add_consumer_key(server_ptr->id, envelope.message.body),
@@ -2424,6 +2759,7 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 						}
 					}
 				}
+				*/
 				break;
 			case AMQP_RESPONSE_SERVER_EXCEPTION:
 				LM_ERR("AMQP_RESPONSE_SERVER_EXCEPTION in consume\n");
@@ -2446,7 +2782,6 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 
 void kz_amqp_consumer_worker_cb(int fd, short event, void *arg)
 {
-
 	kz_amqp_consumer_delivery_ptr cmd;
 	if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
 		LM_ERR("failed to read from command pipe: %s\n", strerror(errno));
@@ -2454,7 +2789,14 @@ void kz_amqp_consumer_worker_cb(int fd, short event, void *arg)
 	}
 
 	LM_DBG("consumer %d received payload %s\n", my_pid(), cmd->payload);
-	kz_amqp_consumer_event(cmd->payload, cmd->event_key, cmd->event_subkey);
+
+	if(cmd->cmd) {
+		kz_amqp_set_last_result(cmd->payload);
+		kz_amqp_cb_ok(cmd->cmd);
+	} else {
+		kz_amqp_consumer_event(cmd->payload, cmd->event_key, cmd->event_subkey);
+	}
+
 	kz_amqp_free_consumer_delivery(cmd);
 	LM_DBG("exiting consumer %d\n", my_pid());
 }

+ 21 - 1
modules/kazoo/kz_amqp.h

@@ -112,11 +112,27 @@ typedef struct {
 	amqp_channel_t channel;
 	struct timeval timeout;
 
+	/* timer */
+//	struct event *timer_ev;
+//	int timerfd;
+
+	/* async */
+	char *cb_route;
+	char *err_route;
+	unsigned int t_hash;
+	unsigned int t_label;
+
+
+} kz_amqp_cmd, *kz_amqp_cmd_ptr;
+
+typedef struct {
+	str* message_id;
+
 	/* timer */
 	struct event *timer_ev;
 	int timerfd;
 
-} kz_amqp_cmd, *kz_amqp_cmd_ptr;
+} kz_amqp_cmd_timeout, *kz_amqp_cmd_timeout_ptr;
 
 typedef struct kz_amqp_cmd_entry_t {
 	kz_amqp_cmd_ptr cmd;
@@ -135,6 +151,8 @@ typedef struct {
 	amqp_channel_t channel;
 	char* event_key;
 	char* event_subkey;
+	str* message_id;
+	kz_amqp_cmd_ptr cmd;
 } kz_amqp_consumer_delivery, *kz_amqp_consumer_delivery_ptr;
 
 typedef struct {
@@ -213,6 +231,8 @@ int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange
 int kz_amqp_encode(struct sip_msg* msg, char* unencoded, char* encoded);
 int kz_amqp_encode_ex(str* unencoded, pv_value_p dst_val);
 
+int kz_amqp_async_query(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* _cb_route, char* _err_route);
+
 //void kz_amqp_generic_consumer_loop(int child_no);
 void kz_amqp_manager_loop(int child_no);
 

+ 10 - 0
modules/kazoo/kz_fixup.c

@@ -134,4 +134,14 @@ int fixup_kz_amqp4_free(void** param, int param_no)
 }
 
 
+int fixup_kz_async_amqp(void** param, int param_no)
+{
+	return fixup_spve_null(param, 1);
+}
+
+int fixup_kz_async_amqp_free(void** param, int param_no)
+{
+	return fixup_free_spve_null(param, 1);
+}
+
 

+ 3 - 0
modules/kazoo/kz_fixup.h

@@ -11,6 +11,9 @@
 int fixup_kz_amqp(void** param, int param_no);
 int fixup_kz_amqp_free(void** param, int param_no);
 
+int fixup_kz_async_amqp(void** param, int param_no);
+int fixup_kz_async_amqp_free(void** param, int param_no);
+
 int fixup_kz_amqp4(void** param, int param_no);
 int fixup_kz_amqp4_free(void** param, int param_no);
 

+ 1 - 1
modules/kazoo/kz_json.c

@@ -235,7 +235,7 @@ struct json_object* kz_json_parse(const char *str)
 
     tok = json_tokener_new();
     if (!tok) {
-      LM_ERR("Error parsing json: cpuld not allocate tokener\n");
+      LM_ERR("Error parsing json: could not allocate tokener\n");
       return NULL;
     }