Pārlūkot izejas kodu

kazoo initial commit

lazedo 11 gadi atpakaļ
vecāks
revīzija
e134574be9

+ 23 - 0
modules/kazoo/Makefile

@@ -0,0 +1,23 @@
+# $Id$
+#
+# WARNING: do not run this directly, it should be run by the master Makefile
+
+include ../../Makefile.defs
+auto_gen=
+NAME=kazoo.so
+
+DEFS+=-I/usr/local/include -DKAMAILIO_MOD_INTERFACE
+LIBS=-L/usr/local/lib -lrabbitmq -ljson
+
+DEFS += -DSER_MOD_INTERFACE
+
+ifeq ($(CROSS_COMPILE),)
+XML2CFG=$(shell which xml2-config)
+endif
+
+SERLIBPATH=../../lib
+SER_LIBS=$(SERLIBPATH)/srdb2/srdb2 $(SERLIBPATH)/srdb1/srdb1
+SER_LIBS+=$(SERLIBPATH)/kmi/kmi
+SER_LIBS+=$(SERLIBPATH)/kcore/kcore
+
+include ../../Makefile.modules

+ 17 - 0
modules/kazoo/README

@@ -0,0 +1,17 @@
+KAZOO Module
+
+2600hz
+
+   <[email protected]>
+
+Edited by
+
+Luis Azedo
+
+   <[email protected]>
+
+     __________________________________________________________________
+
+   Table of Contents
+
+Documentation to come in the next days

+ 38 - 0
modules/kazoo/const.c

@@ -0,0 +1,38 @@
+/*
+ * const.c
+ *
+ *  Created on: Jul 15, 2014
+ *      Author: root
+ */
+
+#include "../../str.h"
+
+str str_event_message_summary = str_init("message-summary");
+str str_event_dialog = str_init("dialog");
+str str_event_presence = str_init("presence");
+
+str str_username_col = str_init("username");
+str str_domain_col = str_init("domain");
+str str_body_col = str_init("body");
+str str_expires_col = str_init("expires");
+str str_received_time_col = str_init("received_time");
+str str_presentity_uri_col = str_init("presentity_uri");
+
+str str_event_col = str_init("event");
+str str_contact_col = str_init("contact");
+str str_callid_col = str_init("callid");
+str str_from_tag_col = str_init("from_tag");
+str str_to_tag_col = str_init("to_tag");
+str str_etag_col = str_init("etag");
+str str_sender_col = str_init("sender");
+
+str str_presence_note_busy = str_init("Busy");
+str str_presence_note_otp = str_init("On the Phone");
+str str_presence_note_idle = str_init("Idle");
+str str_presence_note_offline = str_init("Offline");
+str str_presence_act_busy = str_init("<rpid:busy/>");
+str str_presence_act_otp = str_init("<rpid:on-the-phone/>");
+str str_presence_status_offline = str_init("closed");
+str str_presence_status_online = str_init("open");
+
+str str_null_string = str_init("NULL");

+ 44 - 0
modules/kazoo/const.h

@@ -0,0 +1,44 @@
+/*
+ * const.h
+ *
+ *  Created on: Jul 15, 2014
+ *      Author: root
+ */
+
+#ifndef DBK_CONST_H_
+#define DBK_CONST_H_
+
+#include "../../str.h"
+
+extern str str_event_message_summary;
+extern str str_event_dialog;
+extern str str_event_presence;
+
+extern str str_username_col;
+extern str str_domain_col;
+extern str str_body_col;
+extern str str_expires_col;
+extern str str_received_time_col;
+extern str str_presentity_uri_col;
+
+extern str str_event_col;
+extern str str_contact_col;
+extern str str_callid_col;
+extern str str_from_tag_col;
+extern str str_to_tag_col;
+extern str str_etag_col;
+extern str str_sender_col;
+
+extern str str_presence_note_busy;
+extern str str_presence_note_otp;
+extern str str_presence_note_idle;
+extern str str_presence_note_offline;
+extern str str_presence_act_busy;
+extern str str_presence_act_otp;
+extern str str_presence_status_offline;
+extern str str_presence_status_online;
+
+extern str str_null_string;
+
+
+#endif /* DBK_CONST_H_ */

+ 112 - 0
modules/kazoo/defs.h

@@ -0,0 +1,112 @@
+/*
+ * defs.h
+ *
+ *  Created on: Jul 15, 2014
+ *      Author: root
+ */
+
+#ifndef DBK_DEFS_H_
+#define DBK_DEFS_H_
+
+#define BLF_MAX_DIALOGS 8
+#define BLF_JSON_FROM      	"From"
+#define BLF_JSON_FROM_USER 	"From-User"
+#define BLF_JSON_FROM_REALM	"From-Realm"
+#define BLF_JSON_TO        	"To"
+#define BLF_JSON_TO_USER 	"To-User"
+#define BLF_JSON_TO_REALM	"To-Realm"
+#define BLF_JSON_CALLID    	"Call-ID"
+#define BLF_JSON_TOTAG     	"To-Tag"
+#define BLF_JSON_FROMTAG   	"From-Tag"
+#define BLF_JSON_STATE     	"State"
+#define BLF_JSON_USER      	"User"
+#define BLF_JSON_QUEUE     	"Queue"
+#define BLF_JSON_EXPIRES	"Expires"
+#define BLF_JSON_APP_NAME       "App-Name"
+#define BLF_JSON_APP_VERSION    "App-Version"
+#define BLF_JSON_NODE           "Node"
+#define BLF_JSON_SERVERID       "Server-ID"
+#define BLF_JSON_EVENT_CATEGORY "Event-Category"
+#define BLF_JSON_EVENT_NAME     "Event-Name"
+#define BLF_JSON_TYPE           "Type"
+#define BLF_JSON_MSG_ID         "Msg-ID"
+#define BLF_JSON_DIRECTION      "Direction"
+
+#define BLF_JSON_CONTACT   	"Contact"
+#define BLF_JSON_EVENT_PKG      "Event-Package"
+#define MWI_JSON_WAITING        "Messages-Waiting"
+#define MWI_JSON_NEW            "Messages-New"
+#define MWI_JSON_SAVED          "Messages-Saved"
+#define MWI_JSON_URGENT         "Messages-Urgent"
+#define MWI_JSON_URGENT_SAVED   "Messages-Urgent-Saved"
+#define MWI_JSON_ACCOUNT        "Message-Account"
+#define MWI_JSON_FROM      	"From"
+#define MWI_JSON_TO        	"To"
+
+#define DIALOGINFO_BODY_BUFFER_SIZE 8192
+#define MWI_BODY_BUFFER_SIZE 2048
+#define PRESENCE_BODY_BUFFER_SIZE 4096
+
+#define MWI_BODY             "Messages-Waiting: %.*s\r\nMessage-Account: %.*s\r\nVoice-Message: %.*s/%.*s (%.*s/%.*s)\r\n"
+#define PRESENCE_BODY        "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \
+<presence xmlns=\"urn:ietf:params:xml:ns:pidf\" xmlns:dm=\"urn:ietf:params:xml:ns:pidf:data-model\" xmlns:rpid=\"urn:ietf:params:xml:ns:pidf:rpid\" xmlns:c=\"urn:ietf:params:xml:ns:pidf:cipid\" entity=\"%s\"> \
+    <tuple xmlns=\"urn:ietf:params:xml:ns:pidf\" id=\"%s\"> \
+        <status> \
+            <basic>%s</basic> \
+        </status> \
+    </tuple> \
+    <note xmlns=\"urn:ietf:params:xml:ns:pidf\">%s</note> \
+    <dm:person xmlns:dm=\"urn:ietf:params:xml:ns:pidf:data-model\" xmlns:rpid=\"urn:ietf:params:xml:ns:pidf:rpid\" id=\"1\"> \
+        <rpid:activities>%s</rpid:activities> \
+        <dm:note>%s</dm:note> \
+    </dm:person> \
+</presence>"
+
+#define DIALOGINFO_EMPTY_BODY "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \
+<dialog-info xmlns=\"urn:ietf:params:xml:ns:dialog-info\" version=\"1\" state=\"full\" entity=\"%.*s\"> \
+   <dialog direction=\"initiator\"> \
+      <state>terminated</state> \
+   </dialog> \
+</dialog-info>"
+
+#define DIALOGINFO_BODY "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \
+<dialog-info xmlns=\"urn:ietf:params:xml:ns:dialog-info\" version=\"1\" state=\"full\" entity=\"%.*s\"> \
+   <dialog id=\"%.*s\" call-id=\"%.*s\" local-tag=\"%.*s\" remote-tag=\"%.*s\" direction=\"%.*s\"> \
+      <state>%.*s</state> \
+      <local> \
+         <identity>%.*s</identity> \
+         <target uri=\"sip:%.*s\"/> \
+      </local> \
+      <remote> \
+         <identity>%.*s</identity> \
+         <target uri=\"sip:%.*s\"/> \
+      </remote> \
+   </dialog> \
+</dialog-info>"
+
+#define DIALOGINFO_BODY_2 "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \
+<dialog-info xmlns=\"urn:ietf:params:xml:ns:dialog-info\" version=\"1\" state=\"full\" entity=\"%.*s\"> \
+   <dialog id=\"%.*s\" call-id=\"%.*s\" local-tag=\"%.*s\" remote-tag=\"%.*s\" direction=\"%.*s\"> \
+      <state>%.*s</state> \
+      <local> \
+         <identity>%.*s</identity> \
+      </local> \
+      <remote> \
+         <identity>%.*s</identity> \
+      </remote> \
+   </dialog> \
+</dialog-info>"
+
+#define json_extract_field(json_name, field)  do {                      \
+    struct json_object* obj = json_object_object_get(json_obj, json_name); \
+    field.s = (char*)json_object_get_string(obj);                       \
+    if (field.s == NULL) {                                              \
+      LM_DBG("Json-c error - failed to extract field [%s]\n", json_name); \
+      field.s = "";                                                     \
+    } else {                                                            \
+      field.len = strlen(field.s);                                      \
+    }                                                                   \
+    LM_DBG("%s: [%s]\n", json_name, field.s?field.s:"Empty");           \
+  } while (0);
+
+#endif /* DBK_DEFS_H_ */

+ 4 - 0
modules/kazoo/doc/Makefile

@@ -0,0 +1,4 @@
+docs = kazoo.xml
+
+docbook_dir = ../../../docbook
+include $(docbook_dir)/Makefile.module

+ 39 - 0
modules/kazoo/doc/kazoo.xml

@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+
+<book xmlns:xi="http://www.w3.org/2001/XInclude">
+    <bookinfo>
+	<title>KAZOO Module</title>
+	<productname class="trade">&kamailioname;</productname>
+	<authorgroup>
+	    <author>
+		<firstname>Luis</firstname>
+		<surname>Azedo</surname>
+		<email>[email protected]</email>
+	    </author>
+	    <editor>
+		<firstname>Luis</firstname>
+		<surname>Azedo</surname>
+		<email>[email protected]</email>
+	    </editor>
+	</authorgroup>
+	<copyright>
+	    <year>2010</year>
+	    <year>2014</year>
+	    <holder>&fhg;</holder>
+	</copyright>
+    </bookinfo>
+    <toc></toc>
+    
+    <xi:include href="db_text_admin.xml"/>
+    <!-- <xi:include href="db_text_devel.xml"/> -->
+    
+
+</book>

+ 22 - 0
modules/kazoo/doc/kazoo_admin.xml

@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+<!-- Module User's Guide -->
+
+<chapter xmlns:xi="http://www.w3.org/2001/XInclude">
+	<title>&adminguide;</title>
+
+	<section>
+	<title>Overview</title>
+	<para>
+		Documentation to come in the next days.
+	</para>
+	</section>
+</chapter>
+

+ 395 - 0
modules/kazoo/kazoo.c

@@ -0,0 +1,395 @@
+/*
+ * $Id$
+ *
+ * Kazoo module interface
+ *
+ * Copyright (C) 2013 2600Hz
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ * History:
+ * --------
+ * 2013-04  first version (Anca Vamanu)
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "../../sr_module.h"
+#include "../../lib/srdb1/db.h"
+#include "../../dprint.h"
+#include "../../lib/kmi/mi.h"
+#include "../tm/tm_load.h"
+#include "../../cfg/cfg_struct.h"
+
+#include "../pua/pua.h"
+#include "../pua/pua_bind.h"
+#include "../pua/send_publish.h"
+#include "../presence/bind_presence.h"
+
+#include "kz_amqp.h"
+#include "kz_json.h"
+#include "kz_fixup.h"
+#include "kz_trans.h"
+#include "kz_pua.h"
+
+#define DBK_DEFAULT_NO_CONSUMERS 4
+
+static int mod_init(void);
+static int  mod_child_init(int rank);
+static int fire_init_event(int rank);
+static void mod_destroy(void);
+static void  mod_consumer_proc(int rank);
+
+str dbk_node_hostname = { 0, 0 };
+str dbk_reg_fs_path = { 0, 0 };
+
+int dbk_auth_wait_timeout = 3;
+int dbk_reconn_retries = 8;
+
+int dbk_presentity_phtable_size = 4096;
+
+int dbk_dialog_expires = 30;
+int dbk_presence_expires = 3600;
+int dbk_mwi_expires = 3600;
+int dbk_create_empty_dialog = 1;
+
+int dbk_channels = 50;
+
+int dbk_consumer_processes = DBK_DEFAULT_NO_CONSUMERS;
+
+struct timeval kz_sock_tv = (struct timeval){0,100000};
+struct timeval kz_amqp_tv = (struct timeval){0,100000};
+struct timeval kz_qtimeout_tv = (struct timeval){2,0};
+struct timeval kz_ack_tv = (struct timeval){0,100000};
+
+
+str dbk_consumer_event_key = str_init("Event-Category");
+str dbk_consumer_event_subkey = str_init("Event-Name");
+
+int dbk_internal_loop_count = 5;
+int dbk_consumer_loop_count = 10;
+int dbk_consumer_ack_loop_count = 20;
+int dbk_include_entity = 0;
+int dbk_pua_mode = 1;
+
+int dbk_single_consumer_on_reconnect = 1;
+int dbk_consume_messages_on_reconnect = 1;
+
+
+struct tm_binds tmb;
+pua_api_t kz_pua_api;
+presence_api_t kz_presence_api;
+
+int startup_time = 0;
+
+int *kz_pipe_fds = NULL;
+
+db1_con_t * shared_db1 = NULL;
+
+/* database connection */
+db1_con_t *kz_pa_db = NULL;
+db_func_t kz_pa_dbf;
+str kz_presentity_table = str_init("presentity");
+str kz_db_url = {0,0};
+
+MODULE_VERSION
+
+static tr_export_t mod_trans[] = {
+	{ {"kz", sizeof("kz")-1}, kz_tr_parse},
+	{ { 0, 0 }, 0 }
+};
+
+static pv_export_t kz_mod_pvs[] = {
+	{{"kzR", (sizeof("kzR")-1)}, PVT_OTHER, kz_pv_get_last_query_result, 0,	0, 0, 0, 0},
+	{{"kzE", (sizeof("kzE")-1)}, PVT_OTHER, kz_pv_get_event_payload, 0,	0, 0, 0, 0},
+	{ {0, 0}, 0, 0, 0, 0, 0, 0, 0 }
+};
+
+/*
+ *  database module interface
+ */
+static cmd_export_t cmds[] = {
+    {"kazoo_publish", (cmd_function) kz_amqp_publish, 3, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE},
+    {"kazoo_query", (cmd_function) kz_amqp_query, 4, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE},
+    {"kazoo_query", (cmd_function) kz_amqp_query_ex, 3, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE},
+    {"kazoo_pua_publish", (cmd_function) kz_pua_publish, 1, 0, 0, ANY_ROUTE},
+    /*
+    {"kazoo_pua_flush", (cmd_function) w_mi_dbk_presentity_flush0, 0, 0, 0, ANY_ROUTE},
+    {"kazoo_pua_flush", (cmd_function) w_mi_dbk_presentity_flush1, 1, 0, 0, ANY_ROUTE},
+    {"kazoo_pua_flush", (cmd_function) w_mi_dbk_presentity_flush2, 2, 0, 0, ANY_ROUTE},
+    {"kazoo_pua_flush", (cmd_function) w_mi_dbk_presentity_flush3, 3, 0, 0, ANY_ROUTE},
+    */
+
+/*
+    {"kazoo_subscribe", (cmd_function) kz_amqp_subscribe_1, 1, fixup_kz_amqp4, fixup_kz_amqp4_free, ANY_ROUTE},
+    {"kazoo_subscribe", (cmd_function) kz_amqp_subscribe_2, 2, fixup_kz_amqp4, fixup_kz_amqp4_free, ANY_ROUTE},
+    {"kazoo_subscribe", (cmd_function) kz_amqp_subscribe_3, 3, fixup_kz_amqp4, fixup_kz_amqp4_free, ANY_ROUTE},
+*/
+    {"kazoo_subscribe", (cmd_function) kz_amqp_subscribe, 1, fixup_kz_amqp4, fixup_kz_amqp4_free, ANY_ROUTE},
+    {"kazoo_subscribe", (cmd_function) kz_amqp_subscribe_simple, 4, fixup_kz_amqp4, fixup_kz_amqp4_free, ANY_ROUTE},
+
+
+    {"kazoo_json", (cmd_function) kz_json_get_field, 3, fixup_kz_json, fixup_kz_json_free, ANY_ROUTE},
+    {"kazoo_encode", (cmd_function) kz_amqp_encode, 2, fixup_kz_amqp_encode, fixup_kz_amqp_encode_free, ANY_ROUTE},
+    {0, 0, 0, 0, 0, 0}
+};
+
+static param_export_t params[] = {
+    {"node_hostname", STR_PARAM, &dbk_node_hostname.s},
+    {"dialog_expires", INT_PARAM, &dbk_dialog_expires},
+    {"presence_expires", INT_PARAM, &dbk_presence_expires},
+    {"mwi_expires", INT_PARAM, &dbk_mwi_expires},
+    {"amqp_connection", STR_PARAM|USE_FUNC_PARAM,(void*)kz_amqp_add_connection},
+    {"amqp_max_channels", INT_PARAM, &dbk_channels},
+    {"amqp_consumer_ack_timeout_micro", INT_PARAM, &kz_ack_tv.tv_usec},
+    {"amqp_consumer_ack_timeout_sec", INT_PARAM, &kz_ack_tv.tv_sec},
+    {"amqp_interprocess_timeout_micro", INT_PARAM, &kz_sock_tv.tv_usec},
+    {"amqp_interprocess_timeout_sec", INT_PARAM, &kz_sock_tv.tv_sec},
+    {"amqp_waitframe_timout_micro", INT_PARAM, &kz_amqp_tv.tv_usec},
+    {"amqp_waitframe_timout_sec", INT_PARAM, &kz_amqp_tv.tv_sec},
+    {"amqp_consumer_processes", INT_PARAM, &dbk_consumer_processes},
+    {"amqp_consumer_event_key", STR_PARAM, &dbk_consumer_event_key.s},
+    {"amqp_consumer_event_subkey", STR_PARAM, &dbk_consumer_event_subkey.s},
+    {"amqp_query_timout_micro", INT_PARAM, &kz_qtimeout_tv.tv_usec},
+    {"amqp_query_timout_sec", INT_PARAM, &kz_qtimeout_tv.tv_sec},
+    {"amqp_internal_loop_count", INT_PARAM, &dbk_internal_loop_count},
+    {"amqp_consumer_loop_count", INT_PARAM, &dbk_consumer_loop_count},
+    {"amqp_consumer_ack_loop_count", INT_PARAM, &dbk_consumer_ack_loop_count},
+    {"pua_include_entity", INT_PARAM, &dbk_include_entity},
+    {"presentity_table", STR_PARAM, &kz_presentity_table.s},
+	{"db_url", STR_PARAM, &kz_db_url.s},
+    {"pua_mode", INT_PARAM, &dbk_pua_mode},
+    {"single_consumer_on_reconnect", INT_PARAM, &dbk_single_consumer_on_reconnect},
+    {"consume_messages_on_reconnect", INT_PARAM, &dbk_consume_messages_on_reconnect},
+    {0, 0, 0}
+};
+
+
+struct module_exports exports = {
+    "kazoo",
+    DEFAULT_DLFLAGS,		/* dlopen flags */
+    cmds,
+    params,			/* module parameters */
+    0,				/* exported statistics */
+    0,			/* exported MI functions */
+    kz_mod_pvs,				/* exported pseudo-variables */
+    0,				/* extra processes */
+    mod_init,			/* module initialization function */
+    0,				/* response function */
+    mod_destroy,		/* destroy function */
+    mod_child_init				/* per-child init function */
+};
+
+
+
+static int kz_initialize_bindings() {
+    LM_DBG("kz_initialize_bindings\n");
+
+    /* load all TM stuff */
+    if (load_tm_api(&tmb) == -1) {
+    	LM_ERR("Can't load tm functions. Module TM not loaded?\n");
+    	return -1;
+    }
+
+    return 0;
+}
+
+static int mod_init(void) {
+	int i;
+    startup_time = (int) time(NULL);
+
+
+    if (dbk_node_hostname.s == NULL) {
+	LM_ERR("You must set the node_hostname parameter\n");
+	return -1;
+    }
+    dbk_node_hostname.len = strlen(dbk_node_hostname.s);
+
+    dbk_consumer_event_key.len = strlen(dbk_consumer_event_key.s);
+   	dbk_consumer_event_subkey.len = strlen(dbk_consumer_event_subkey.s);
+
+    kz_amqp_init();
+
+    if(kz_initialize_bindings() == -1) {
+   		LM_ERR("Error initializing bindings\n");
+   		return -1;
+   	}
+
+    if(dbk_pua_mode == 1) {
+		kz_db_url.len = kz_db_url.s ? strlen(kz_db_url.s) : 0;
+		LM_DBG("db_url=%s/%d/%p\n", ZSW(kz_db_url.s), kz_db_url.len,kz_db_url.s);
+		kz_presentity_table.len = strlen(kz_presentity_table.s);
+
+		if(kz_db_url.len > 0) {
+
+			/* binding to database module  */
+			if (db_bind_mod(&kz_db_url, &kz_pa_dbf))
+			{
+				LM_ERR("Database module not found\n");
+				return -1;
+			}
+
+
+			if (!DB_CAPABILITY(kz_pa_dbf, DB_CAP_ALL))
+			{
+				LM_ERR("Database module does not implement all functions"
+						" needed by kazoo module\n");
+				return -1;
+			}
+
+			kz_pa_db = kz_pa_dbf.init(&kz_db_url);
+			if (!kz_pa_db)
+			{
+				LM_ERR("Connection to database failed\n");
+				return -1;
+			}
+
+			kz_pa_dbf.close(kz_pa_db);
+			kz_pa_db = NULL;
+		}
+    }
+
+
+    int total_workers = dbk_consumer_processes + 1;
+    int total_pipes = total_workers + 1;
+    kz_pipe_fds = (int*) shm_malloc(sizeof(int) * (total_pipes) * 2 );
+
+    for(i=0; i < total_pipes; i++) {
+    	kz_pipe_fds[i*2] = kz_pipe_fds[i*2+1] = -1;
+		if (pipe(&kz_pipe_fds[i*2]) < 0) {
+			LM_ERR("pipe(%d) failed\n", i);
+			return -1;
+		}
+    }
+
+    register_procs(total_workers);
+
+    return 0;
+}
+
+int mod_register(char *path, int *dlflags, void *p1, void *p2)
+{
+	if(kz_tr_init_buffers()<0)
+	{
+		LM_ERR("failed to initialize transformations buffers\n");
+		return -1;
+	}
+	return register_trans_mod(path, mod_trans);
+}
+
+
+static int mod_child_init(int rank)
+{
+	int pid;
+	int i;
+
+	fire_init_event(rank);
+
+	if (rank==PROC_INIT || rank==PROC_TCP_MAIN)
+		return 0;
+
+
+	if (rank==PROC_MAIN) {
+		pid=fork_process(1, "AMQP Manager", 0);
+		if (pid<0)
+			return -1; /* error */
+		if(pid==0){
+			kz_amqp_manager_loop(0);
+		}
+		else {
+			for(i=0; i < dbk_consumer_processes; i++) {
+				pid=fork_process(i+2, "AMQP Consumer", 0);
+				if (pid<0)
+					return -1; /* error */
+				if(pid==0){
+					mod_consumer_proc(i+1);
+				}
+			}
+		}
+
+		return 0;
+	}
+
+		if(dbk_pua_mode == 1) {
+			if (kz_pa_dbf.init==0)
+			{
+				LM_CRIT("child_init: database not bound\n");
+				return -1;
+			}
+			kz_pa_db = kz_pa_dbf.init(&kz_db_url);
+			if (!kz_pa_db)
+			{
+				LM_ERR("child %d: unsuccessful connecting to database\n", rank);
+				return -1;
+			}
+
+			if (kz_pa_dbf.use_table(kz_pa_db, &kz_presentity_table) < 0)
+			{
+				LM_ERR( "child %d:unsuccessful use_table presentity_table\n", rank);
+				return -1;
+			}
+			LM_DBG("child %d: Database connection opened successfully\n", rank);
+	}
+
+	return 0;
+}
+
+static void  mod_consumer_proc(int rank)
+{
+	kz_amqp_consumer_loop(rank);
+}
+
+
+static int fire_init_event(int rank)
+{
+	struct sip_msg *fmsg;
+	struct run_act_ctx ctx;
+	int rtb, rt;
+
+	LM_DBG("rank is (%d)\n", rank);
+	if (rank!=PROC_INIT)
+		return 0;
+
+	rt = route_get(&event_rt, "kazoo:mod-init");
+	if(rt>=0 && event_rt.rlist[rt]!=NULL) {
+		LM_DBG("executing event_route[kazoo:mod-init] (%d)\n", rt);
+		if(faked_msg_init()<0)
+			return -1;
+		fmsg = faked_msg_next();
+		rtb = get_route_type();
+		set_route_type(REQUEST_ROUTE);
+		init_run_actions_ctx(&ctx);
+		run_top_route(event_rt.rlist[rt], fmsg, &ctx);
+		if(ctx.run_flags&DROP_R_F)
+		{
+			LM_ERR("exit due to 'drop' in event route\n");
+			return -1;
+		}
+		set_route_type(rtb);
+	}
+
+	return 0;
+}
+
+
+static void mod_destroy(void) {
+	kz_amqp_destroy();
+    shm_free(kz_pipe_fds);
+}
+
+

+ 1731 - 0
modules/kazoo/kz_amqp.c

@@ -0,0 +1,1731 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <amqp.h>
+#include <amqp_framing.h>
+#include <amqp_tcp_socket.h>
+#include <json/json.h>
+#include "../../mem/mem.h"
+#include "../../timer_proc.h"
+#include "../../sr_module.h"
+#include "../../pvar.h"
+#include "../../mod_fix.h"
+#include "../../lvalue.h"
+
+
+#include "kz_amqp.h"
+
+#define RET_AMQP_ERROR 2
+
+
+kz_amqp_conn_pool_ptr kz_pool = NULL;
+kz_amqp_bindings_ptr kz_bindings = NULL;
+
+static unsigned long rpl_query_routing_key_count = 0;
+
+typedef struct json_object *json_obj_ptr;
+
+kz_amqp_channel_ptr channels = NULL;
+int channel_index = 0;
+extern int *kz_pipe_fds;
+
+extern struct timeval kz_sock_tv;
+extern struct timeval kz_amqp_tv;
+extern struct timeval kz_qtimeout_tv;
+extern struct timeval kz_ack_tv;
+
+extern int dbk_internal_loop_count;
+extern int dbk_consumer_loop_count;
+extern int dbk_consumer_ack_loop_count;
+
+
+extern int dbk_single_consumer_on_reconnect;
+extern int dbk_consume_messages_on_reconnect;
+
+static char *kz_amqp_str_dup(str *src)
+{
+	char *res;
+
+	if (!src || !src->s)
+		return NULL;
+	if (!(res = (char *) shm_malloc(src->len + 1)))
+		return NULL;
+	strncpy(res, src->s, src->len);
+	res[src->len] = 0;
+	return res;
+}
+
+static char *kz_amqp_string_dup(char *src)
+{
+	char *res;
+	int sz;
+	if (!src )
+		return NULL;
+
+	sz = strlen(src);
+	if (!(res = (char *) shm_malloc(sz + 1)))
+		return NULL;
+	strncpy(res, src, sz);
+	res[sz] = 0;
+	return res;
+}
+
+char *kz_amqp_bytes_dup(amqp_bytes_t bytes)
+{
+	char *res;
+	int sz;
+	if (!bytes.bytes )
+		return NULL;
+
+	sz = bytes.len;
+	if (!(res = (char *) shm_malloc(sz + 1)))
+		return NULL;
+	strncpy(res, bytes.bytes, sz);
+	res[sz] = 0;
+	return res;
+}
+
+void kz_amqp_bytes_free(amqp_bytes_t bytes)
+{
+  shm_free(bytes.bytes);
+}
+
+amqp_bytes_t kz_amqp_bytes_malloc_dup(amqp_bytes_t src)
+{
+  amqp_bytes_t result = {0, 0};
+  result.len = src.len;
+  result.bytes = shm_malloc(src.len+1);
+  if (result.bytes != NULL) {
+    memcpy(result.bytes, src.bytes, src.len);
+    ((char*)result.bytes)[result.len] = '\0';
+  }
+  return result;
+}
+
+amqp_bytes_t kz_amqp_bytes_dup_from_string(char *src)
+{
+	return kz_amqp_bytes_malloc_dup(amqp_cstring_bytes(src));
+}
+
+amqp_bytes_t kz_amqp_bytes_dup_from_str(str *src)
+{
+	return kz_amqp_bytes_malloc_dup(amqp_cstring_bytes(src->s));
+}
+
+void kz_amqp_free_bind(kz_amqp_bind_ptr bind)
+{
+	if(bind == NULL)
+		return;
+	if(bind->exchange.bytes)
+		kz_amqp_bytes_free(bind->exchange);
+	if(bind->exchange_type.bytes)
+		kz_amqp_bytes_free(bind->exchange_type);
+	if(bind->queue.bytes)
+		kz_amqp_bytes_free(bind->queue);
+	if(bind->routing_key.bytes)
+		kz_amqp_bytes_free(bind->routing_key);
+	shm_free(bind);
+}
+
+void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd)
+{
+	if(cmd == NULL)
+		return;
+	if (cmd->exchange)
+		shm_free(cmd->exchange);
+	if (cmd->exchange_type)
+		shm_free(cmd->exchange_type);
+	if (cmd->queue)
+		shm_free(cmd->queue);
+	if (cmd->routing_key)
+		shm_free(cmd->routing_key);
+	if (cmd->reply_routing_key)
+		shm_free(cmd->reply_routing_key);
+	if (cmd->payload)
+		shm_free(cmd->payload);
+	if (cmd->return_payload)
+		shm_free(cmd->return_payload);
+	lock_release(&cmd->lock);
+	lock_destroy(&cmd->lock);
+	shm_free(cmd);
+}
+
+kz_amqp_cmd_ptr kz_amqp_alloc_pipe_cmd()
+{
+	kz_amqp_cmd_ptr cmd = (kz_amqp_cmd_ptr)shm_malloc(sizeof(kz_amqp_cmd));
+	if(cmd == NULL) {
+		LM_ERR("failed to allocate kz_amqp_cmd in process %d\n", getpid());
+		return NULL;
+	}
+	memset(cmd, 0, sizeof(kz_amqp_cmd));
+	if(lock_init(&cmd->lock)==NULL)  {
+		LM_ERR("cannot init the lock for pipe command in process %d\n", getpid());
+		lock_dealloc(&cmd->lock);
+		kz_amqp_free_pipe_cmd(cmd);
+		return NULL;
+	}
+	lock_get(&cmd->lock);
+	return cmd;
+}
+
+kz_amqp_bind_ptr kz_amqp_bind_alloc(str* exchange, str* exchange_type, str* queue, str* routing_key )
+{
+    kz_amqp_bind_ptr bind = NULL;
+
+    bind = (kz_amqp_bind_ptr)shm_malloc(sizeof(kz_amqp_bind));
+	if(bind == NULL) {
+		LM_ERR("error allocation memory for bind alloc\n");
+		goto error;
+	}
+	memset(bind, 0, sizeof(kz_amqp_bind));
+
+	if(exchange != NULL) {
+		bind->exchange = kz_amqp_bytes_dup_from_str(exchange);
+	    if (bind->exchange.bytes == NULL) {
+			LM_ERR("Out of memory allocating for exchange\n");
+			goto error;
+	    }
+	}
+
+	if(exchange_type != NULL) {
+		bind->exchange_type = kz_amqp_bytes_dup_from_str(exchange_type);
+	    if (bind->exchange_type.bytes == NULL) {
+			LM_ERR("Out of memory allocating for exchange type\n");
+			goto error;
+	    }
+	}
+
+	if(queue != NULL) {
+		bind->queue = kz_amqp_bytes_dup_from_str(queue);
+	    if (bind->queue.bytes == NULL) {
+			LM_ERR("Out of memory allocating for queue\n");
+			goto error;
+	    }
+	}
+
+	if(routing_key != NULL) {
+		bind->routing_key = kz_amqp_bytes_dup_from_str(routing_key);
+	    if (bind->routing_key.bytes == NULL) {
+			LM_ERR("Out of memory allocating for routing key\n");
+			goto error;
+	    }
+	}
+
+	return bind;
+
+error:
+	kz_amqp_free_bind(bind);
+    return NULL;
+}
+
+void kz_amqp_init_connection_pool() {
+	if(kz_pool == NULL) {
+		kz_pool = (kz_amqp_conn_pool_ptr) shm_malloc(sizeof(kz_amqp_conn_pool));
+		memset(kz_pool, 0, sizeof(kz_amqp_conn_pool));
+	}
+}
+
+void kz_amqp_init() {
+	int i;
+	kz_amqp_init_connection_pool();
+	if(kz_bindings == NULL) {
+		kz_bindings = (kz_amqp_bindings_ptr) shm_malloc(sizeof(kz_amqp_bindings));
+		memset(kz_bindings, 0, sizeof(kz_amqp_bindings));
+	}
+	if(channels == NULL) {
+		channels = shm_malloc(dbk_channels * sizeof(kz_amqp_channel));
+		memset(channels, 0, dbk_channels * sizeof(kz_amqp_channel));
+		for(i=0; i < dbk_channels; i++) {
+			channels[i].channel = i+1;
+		}
+	}
+}
+
+void kz_amqp_destroy() {
+	int i;
+	if(channels != NULL) {
+		for(i=0; i < dbk_channels; i++) {
+			if(channels[i].targeted != NULL) {
+				kz_amqp_free_bind(channels[i].targeted);
+			}
+		}
+		shm_free(channels);
+	}
+
+
+	if(kz_bindings != NULL) {
+		kz_amqp_binding_ptr binding = kz_bindings->head;
+		while(binding != NULL) {
+			kz_amqp_binding_ptr free = binding;
+			binding = binding->next;
+			if(free->bind != NULL) {
+				kz_amqp_free_bind(free->bind);
+			}
+			shm_free(free);
+		}
+		shm_free(kz_bindings);
+	}
+
+	if(kz_pool != NULL) {
+		kz_amqp_conn_ptr conn = kz_pool->head;
+		while(conn != NULL) {
+			kz_amqp_conn_ptr free = conn;
+			conn = conn->next;
+			shm_free(free);
+		}
+		shm_free(kz_pool);
+	}
+
+
+}
+
+int kz_amqp_add_connection(modparam_t type, void* val)
+{
+	kz_amqp_init_connection_pool(); // find a better way
+
+	kz_amqp_conn_ptr newConn = shm_malloc(sizeof(kz_amqp_conn));
+	memset(newConn, 0, sizeof(kz_amqp_conn));
+
+	if(kz_pool->head == NULL)
+		kz_pool->head = newConn;
+
+	if(kz_pool->tail != NULL)
+		kz_pool->tail->next = newConn;
+
+	kz_pool->tail = newConn;
+
+    amqp_parse_url((char*)val, &newConn->info);
+
+	return 0;
+}
+
+void kz_amqp_connection_close(kz_amqp_conn_ptr rmq) {
+    LM_DBG("Close rmq connection\n");
+    if (!rmq)
+    	return;
+
+    if (rmq->conn) {
+		LM_DBG("close connection:  %d rmq(%p)->conn(%p)\n", getpid(), (void *)rmq, rmq->conn);
+		kz_amqp_error("closing connection", amqp_connection_close(rmq->conn, AMQP_REPLY_SUCCESS));
+		if (amqp_destroy_connection(rmq->conn) < 0) {
+			LM_ERR("cannot destroy connection\n");
+		}
+		rmq->conn = NULL;
+		rmq->socket = NULL;
+		rmq->channel_count = 0;
+
+//	   	lock_release(&kz_pool->lock);
+
+    }
+
+}
+
+void kz_amqp_channel_close(kz_amqp_conn_ptr rmq, amqp_channel_t channel) {
+    LM_DBG("Close rmq channel\n");
+    if (!rmq)
+    	return;
+
+	LM_DBG("close channel: %d rmq(%p)->channel(%d)\n", getpid(), (void *)rmq, channel);
+	kz_amqp_error("closing channel", amqp_channel_close(rmq->conn, channel, AMQP_REPLY_SUCCESS));
+}
+
+int kz_amqp_connection_open(kz_amqp_conn_ptr rmq) {
+	rmq->channel_count = rmq->channel_counter = 0;
+    if (!(rmq->conn = amqp_new_connection())) {
+    	LM_DBG("Failed to create new AMQP connection\n");
+    	goto error;
+    }
+
+    rmq->socket = amqp_tcp_socket_new(rmq->conn);
+    if (!rmq->socket) {
+    	LM_DBG("Failed to create TCP socket to AMQP broker\n");
+    	goto error;
+    }
+
+    if (amqp_socket_open(rmq->socket, rmq->info.host, rmq->info.port)) {
+    	LM_DBG("Failed to open TCP socket to AMQP broker\n");
+    	goto error;
+    }
+
+    if (kz_amqp_error("Logging in", amqp_login(rmq->conn,
+					   "/", //rmq->info.vhost,
+					   0,
+					   131072,
+					   0,
+					   AMQP_SASL_METHOD_PLAIN,
+					   rmq->info.user,
+					   rmq->info.password))) {
+
+    	LM_ERR("Login to AMQP broker failed!\n");
+    	goto error;
+    }
+
+    return 0;
+
+ error:
+    kz_amqp_connection_close(rmq);
+    return -1;
+}
+
+int kz_amqp_channel_open(kz_amqp_conn_ptr rmq, amqp_channel_t channel) {
+	if(rmq == NULL) {
+		LM_DBG("rmq == NULL \n");
+		return -1;
+	}
+
+    amqp_channel_open(rmq->conn, channel);
+    if (kz_amqp_error("Opening channel", amqp_get_rpc_reply(rmq->conn))) {
+    	LM_ERR("Failed to open channel AMQP %d!\n", channel);
+    	return -1;
+    }
+
+    return 0;
+}
+
+kz_amqp_conn_ptr kz_amqp_get_connection() {
+	kz_amqp_conn_ptr ptr = NULL;
+	if(kz_pool == NULL) {
+		return NULL;
+	}
+//	lock_get(&kz_pool->lock);
+
+	ptr = kz_pool->head;
+
+	if(kz_pool->current != NULL) {
+		ptr = kz_pool->current;
+	}
+
+	if(ptr->socket == NULL )
+	{
+	while(ptr != NULL) {
+		if(kz_amqp_connection_open(ptr) == 0) {
+			kz_pool->current = ptr;
+			break;
+		}
+		ptr = ptr->next;
+	}
+	}
+
+//	lock_release(&kz_pool->lock);
+
+   	return ptr;
+}
+
+kz_amqp_conn_ptr kz_amqp_get_next_connection() {
+	kz_amqp_conn_ptr ptr = NULL;
+	if(kz_pool == NULL) {
+		return NULL;
+	}
+
+	if(kz_pool->current != NULL) {
+		ptr = kz_pool->current->next;
+	}
+
+	if(ptr == NULL) {
+		ptr = kz_pool->head;
+	}
+
+	while(ptr != NULL) {
+		if(kz_amqp_connection_open(ptr) == 0) {
+			kz_pool->current = ptr;
+			break;
+		}
+		ptr = ptr->next;
+	}
+
+
+   	return ptr;
+}
+
+int kz_amqp_consume_error(amqp_connection_state_t conn)
+{
+	amqp_frame_t frame;
+	int ret = 0;
+	amqp_rpc_reply_t reply;
+
+	if (AMQP_STATUS_OK != amqp_simple_wait_frame_noblock(conn, &frame, &kz_amqp_tv)) {
+		// should i ignore this or close the connection?
+		LM_ERR("ERROR ON SIMPLE_WAIT_FRAME\n");
+		return ret;
+	}
+
+	if (AMQP_FRAME_METHOD == frame.frame_type) {
+		switch (frame.payload.method.id) {
+		case AMQP_BASIC_ACK_METHOD:
+			/* if we've turned publisher confirms on, and we've published a message
+			 * here is a message being confirmed
+			 */
+			ret = 1;
+			break;
+		case AMQP_BASIC_RETURN_METHOD:
+			/* if a published message couldn't be routed and the mandatory flag was set
+			 * this is what would be returned. The message then needs to be read.
+			 */
+			{
+				ret = 1;
+			amqp_message_t message;
+			reply = amqp_read_message(conn, frame.channel, &message, 0);
+			if (AMQP_RESPONSE_NORMAL != reply.reply_type) {
+				LM_ERR("AMQP_BASIC_RETURN_METHOD read_message\n");
+				break;
+			}
+
+			LM_DBG("Received this message : %.*s\n", (int) message.body.len, (char*)message.body.bytes);
+			amqp_destroy_message(&message);
+			}
+			break;
+
+		case AMQP_CHANNEL_CLOSE_METHOD:
+			/* a channel.close method happens when a channel exception occurs, this
+			 * can happen by publishing to an exchange that doesn't exist for example
+			 *
+			 * In this case you would need to open another channel redeclare any queues
+			 * that were declared auto-delete, and restart any consumers that were attached
+			 * to the previous channel
+			 */
+			LM_ERR("AMQP_CHANNEL_CLOSE_METHOD\n");
+			if(frame.channel > 0)
+				channels[frame.channel-1].state = KZ_AMQP_CLOSED;
+			break;
+
+		case AMQP_CONNECTION_CLOSE_METHOD:
+			/* a connection.close method happens when a connection exception occurs,
+			 * this can happen by trying to use a channel that isn't open for example.
+			 *
+			 * In this case the whole connection must be restarted.
+			 */
+			break;
+
+		default:
+			LM_ERR("An unexpected method was received %d\n", frame.payload.method.id);
+			break;
+		}
+	};
+
+	return ret;
+}
+
+void kz_amqp_add_payload_common_properties(json_obj_ptr json_obj, char* server_id, str* unique) {
+    char node_name[512];
+
+
+    json_object_object_add(json_obj, BLF_JSON_APP_NAME,
+			   json_object_new_string(NAME));
+    json_object_object_add(json_obj, BLF_JSON_APP_VERSION,
+			   json_object_new_string(VERSION));
+    sprintf(node_name, "kamailio@%.*s", dbk_node_hostname.len, dbk_node_hostname.s);
+    json_object_object_add(json_obj, BLF_JSON_NODE,
+			   json_object_new_string(node_name));
+//    json_object_object_add(json_obj, BLF_JSON_SERVERID,
+//			   json_object_new_string(server_id));
+    json_object_object_add(json_obj, BLF_JSON_MSG_ID,
+			   json_object_new_string_len(unique->s, unique->len));
+
+}
+
+int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload)
+{
+	int ret = 1;
+    json_obj_ptr json_obj = NULL;
+    kz_amqp_cmd_ptr cmd = NULL;
+
+    str unique_string = { 0, 0 };
+    char serverid[512];
+
+    tmb.generate_callid(&unique_string);
+    sprintf(serverid, "kamailio@%.*s-<%d>-script-%lu", dbk_node_hostname.len, dbk_node_hostname.s, my_pid(), rpl_query_routing_key_count++);
+
+
+    /* parse json  and add extra fields */
+    json_obj = json_tokener_parse(str_payload->s);
+    if (is_error(json_obj))
+    {
+		LM_ERR("Error parsing json: %s\n",json_tokener_errors[-(unsigned long)json_obj]);
+		LM_ERR("%s\n", str_payload->s);
+		goto error;
+    }
+
+    kz_amqp_add_payload_common_properties(json_obj, serverid, &unique_string);
+
+    char *payload = (char *)json_object_to_json_string(json_obj);
+
+	cmd = (kz_amqp_cmd_ptr)shm_malloc(sizeof(kz_amqp_cmd));
+	if(cmd == NULL) {
+		LM_ERR("failed to allocate kz_amqp_cmd in process %d\n", getpid());
+		goto error;
+	}
+	memset(cmd, 0, sizeof(kz_amqp_cmd));
+	cmd->exchange = kz_amqp_str_dup(str_exchange);
+	cmd->routing_key = kz_amqp_str_dup(str_routing_key);
+	cmd->payload = kz_amqp_string_dup(payload);
+	if(cmd->payload == NULL || cmd->routing_key == NULL || cmd->exchange == NULL) {
+		LM_ERR("failed to allocate kz_amqp_cmd parameters in process %d\n", getpid());
+		goto error;
+	}
+	if(lock_init(&cmd->lock)==NULL)
+	{
+		LM_ERR("cannot init the lock for publishing in process %d\n", getpid());
+		lock_dealloc(&cmd->lock);
+		goto error;
+	}
+	lock_get(&cmd->lock);
+	cmd->type = KZ_AMQP_PUBLISH;
+	cmd->consumer = getpid();
+	if (write(kz_pipe_fds[1], &cmd, sizeof(cmd)) != sizeof(cmd)) {
+		LM_ERR("failed to publish message to amqp in process %d, write to command pipe: %s\n", getpid(), strerror(errno));
+	} else {
+		lock_get(&cmd->lock);
+		ret = 1;//cmd->return_code;
+	}
+
+	error:
+
+	if(cmd)
+		kz_amqp_free_pipe_cmd(cmd);
+
+    if(json_obj)
+    	json_object_put(json_obj);
+
+	return ret;
+}
+
+int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_payload, json_obj_ptr* json_ret )
+{
+	int ret = 1;
+    json_obj_ptr json_obj = NULL;
+    kz_amqp_cmd_ptr cmd = NULL;
+    json_obj_ptr json_body = NULL;
+
+    str unique_string = { 0, 0 };
+    char serverid[512];
+
+    tmb.generate_callid(&unique_string);
+    sprintf(serverid, "kamailio@%.*s-<%d>-script-%lu", dbk_node_hostname.len, dbk_node_hostname.s, my_pid(), rpl_query_routing_key_count++);
+
+
+    /* parse json  and add extra fields */
+    json_obj = json_tokener_parse(str_payload->s);
+    if (is_error(json_obj))
+    {
+		LM_ERR("Error parsing json: %s\n",json_tokener_errors[-(unsigned long)json_obj]);
+		LM_ERR("%s\n", str_payload->s);
+		goto error;
+    }
+
+    kz_amqp_add_payload_common_properties(json_obj, serverid, &unique_string);
+
+    char *payload = (char *)json_object_to_json_string(json_obj);
+
+	cmd = (kz_amqp_cmd_ptr)shm_malloc(sizeof(kz_amqp_cmd));
+	if(cmd == NULL) {
+		LM_ERR("failed to allocate kz_amqp_cmd in process %d\n", getpid());
+		goto error;
+	}
+	memset(cmd, 0, sizeof(kz_amqp_cmd));
+	cmd->exchange = kz_amqp_str_dup(str_exchange);
+	cmd->routing_key = kz_amqp_str_dup(str_routing_key);
+	cmd->reply_routing_key = kz_amqp_string_dup(serverid);
+	cmd->payload = kz_amqp_string_dup(payload);
+	cmd->timeout = kz_qtimeout_tv;
+	if(cmd->payload == NULL || cmd->routing_key == NULL || cmd->exchange == NULL) {
+		LM_ERR("failed to allocate kz_amqp_cmd parameters in process %d\n", getpid());
+		goto error;
+	}
+	if(lock_init(&cmd->lock)==NULL)
+	{
+		LM_ERR("cannot init the lock for publishing in process %d\n", getpid());
+		lock_dealloc(&cmd->lock);
+		goto error;
+	}
+	lock_get(&cmd->lock);
+	cmd->type = KZ_AMQP_CALL;
+	cmd->consumer = getpid();
+	if (write(kz_pipe_fds[1], &cmd, sizeof(cmd)) != sizeof(cmd)) {
+		LM_ERR("failed to publish message to amqp in process %d, write to command pipe: %s\n", getpid(), strerror(errno));
+	} else {
+		lock_get(&cmd->lock);
+		switch(cmd->return_code) {
+		case AMQP_RESPONSE_NORMAL:
+			json_body = json_tokener_parse(cmd->return_payload);
+		    if (is_error(json_body))
+		    {
+				LM_ERR("Error parsing body json: %s\n",json_tokener_errors[-(unsigned long)json_body]);
+				LM_ERR("JSON : %s\n", cmd->return_payload);
+				goto error;
+		    }
+		    *json_ret = json_body;
+		    ret = 0;
+		    break;
+
+		default:
+			ret = -1;
+			break;
+		}
+	}
+
+ error:
+	if(cmd)
+		kz_amqp_free_pipe_cmd(cmd);
+
+    if(json_obj)
+    	json_object_put(json_obj);
+
+    return ret;
+}
+
+int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload)
+{
+	  str json_s;
+	  str exchange_s;
+	  str routing_key_s;
+
+		if (fixup_get_svalue(msg, (gparam_p)exchange, &exchange_s) != 0) {
+			LM_ERR("cannot get exchange string value\n");
+			return -1;
+		}
+
+		if (fixup_get_svalue(msg, (gparam_p)routing_key, &routing_key_s) != 0) {
+			LM_ERR("cannot get routing_key string value\n");
+			return -1;
+		}
+
+		if (fixup_get_svalue(msg, (gparam_p)payload, &json_s) != 0) {
+			LM_ERR("cannot get json string value : %s\n", payload);
+			return -1;
+		}
+
+		struct json_object *j = json_tokener_parse(json_s.s);
+
+		if (is_error(j)) {
+			LM_ERR("empty or invalid JSON payload : %.*s\n", json_s.len, json_s.s);
+			return -1;
+		}
+
+		json_object_put(j);
+
+		return kz_amqp_pipe_send(&exchange_s, &routing_key_s, &json_s );
+
+
+};
+
+
+char* last_payload_result = NULL;
+
+int kz_pv_get_last_query_result(struct sip_msg *msg, pv_param_t *param,	pv_value_t *res)
+{
+	return last_payload_result == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, last_payload_result);
+}
+
+
+int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload)
+{
+	  str json_s;
+	  str exchange_s;
+	  str routing_key_s;
+
+	  if(last_payload_result)
+		pkg_free(last_payload_result);
+
+	  last_payload_result = NULL;
+
+		if (fixup_get_svalue(msg, (gparam_p)exchange, &exchange_s) != 0) {
+			LM_ERR("cannot get exchange string value\n");
+			return -1;
+		}
+
+		if (fixup_get_svalue(msg, (gparam_p)routing_key, &routing_key_s) != 0) {
+			LM_ERR("cannot get routing_key string value\n");
+			return -1;
+		}
+
+		if (fixup_get_svalue(msg, (gparam_p)payload, &json_s) != 0) {
+			LM_ERR("cannot get json string value : %s\n", payload);
+			return -1;
+		}
+
+		struct json_object *j = json_tokener_parse(json_s.s);
+
+		if (is_error(j)) {
+			LM_ERR("empty or invalid JSON payload : %*.s\n", json_s.len, json_s.s);
+			return -1;
+		}
+
+		json_object_put(j);
+
+		json_obj_ptr ret = NULL;
+		int res = kz_amqp_pipe_send_receive(&exchange_s, &routing_key_s, &json_s, &ret );
+
+		if(res != 0) {
+			return -1;
+		}
+
+		char* strjson = json_object_to_json_string(ret);
+		int len = strlen(strjson);
+		char* value = pkg_malloc(len+1);
+		memcpy(value, strjson, len);
+		value[len] = '\0';
+		last_payload_result = value;
+		json_object_put(ret);
+
+		return 1;
+};
+
+int kz_amqp_query(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* dst)
+{
+
+	  pv_spec_t *dst_pv;
+	  pv_value_t dst_val;
+
+	  int result = kz_amqp_query_ex(msg, exchange, routing_key, payload);
+	  if(result == -1)
+		  return result;
+
+		dst_pv = (pv_spec_t *)dst;
+		if(last_payload_result != NULL) {
+			dst_val.rs.s = last_payload_result;
+			dst_val.rs.len = strlen(last_payload_result);
+			dst_val.flags = PV_VAL_STR;
+		} else {
+			dst_val.rs.s = NULL;
+			dst_val.rs.len = 0;
+			dst_val.ri = 0;
+			dst_val.flags = PV_VAL_NULL;
+		}
+		dst_pv->setf(msg, &dst_pv->pvp, (int)EQ_T, &dst_val);
+
+		return 1;
+};
+
+int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange_type, char* queue, char* routing_key)
+{
+	str exchange_s;
+	str exchange_type_s;
+	str queue_s;
+	str routing_key_s;
+
+	if (fixup_get_svalue(msg, (gparam_p)exchange, &exchange_s) != 0) {
+		LM_ERR("cannot get exchange string value\n");
+		return -1;
+	}
+
+	if (fixup_get_svalue(msg, (gparam_p)exchange_type, &exchange_type_s) != 0) {
+		LM_ERR("cannot get exchange type string value\n");
+		return -1;
+	}
+
+	if (fixup_get_svalue(msg, (gparam_p)queue, &queue_s) != 0) {
+		LM_ERR("cannot get queue string value\n");
+		return -1;
+	}
+
+	if (fixup_get_svalue(msg, (gparam_p)routing_key, &routing_key_s) != 0) {
+		LM_ERR("cannot get routing_key string value\n");
+		return -1;
+	}
+
+	kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(&exchange_s, &exchange_type_s, &queue_s, &routing_key_s);
+	if(bind == NULL) {
+		LM_ERR("Could not allocate bind struct\n");
+		goto error;
+	}
+
+	bind->auto_delete = 1;
+	bind->no_ack = 1;
+
+	kz_amqp_binding_ptr binding = shm_malloc(sizeof(kz_amqp_binding));
+	if(binding == NULL) {
+		LM_ERR("Could not allocate binding struct\n");
+		goto error;
+	}
+	memset(binding, 0, sizeof(kz_amqp_binding));
+
+	if(kz_bindings->head == NULL)
+		kz_bindings->head = binding;
+
+	if(kz_bindings->tail != NULL)
+		kz_bindings->tail->next = binding;
+
+	kz_bindings->tail = binding;
+	binding->bind = bind;
+
+    return 1;
+
+error:
+    if(binding != NULL)
+    	shm_free(binding);
+
+	return -1;
+
+}
+
+int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
+{
+	str exchange_s;
+	str exchange_type_s;
+	str queue_s;
+	str routing_key_s;
+	str payload_s;
+	int passive = 0;
+	int durable = 0;
+	int exclusive = 0;
+	int auto_delete = 1;
+	int no_ack = 1;
+	int wait_for_consumer_ack = 1;
+
+    json_obj_ptr json_obj = NULL;
+	struct json_object* tmpObj = NULL;
+
+	if (fixup_get_svalue(msg, (gparam_p)payload, &payload_s) != 0) {
+		LM_ERR("cannot get payload value\n");
+		return -1;
+	}
+
+    json_obj = json_tokener_parse(payload_s.s);
+    if (is_error(json_obj))
+    {
+		LM_ERR("Error parsing json: %s\n",json_tokener_errors[-(unsigned long)json_obj]);
+		LM_ERR("%s\n", payload_s.s);
+		return -1;
+    }
+
+    json_extract_field("exchange", exchange_s);
+    json_extract_field("type", exchange_type_s);
+    json_extract_field("queue", queue_s);
+    json_extract_field("routing", routing_key_s);
+
+    tmpObj = json_object_object_get(json_obj, "passive");
+    if(tmpObj != NULL) {
+    	passive = json_object_get_int(tmpObj);
+    }
+
+    tmpObj = json_object_object_get(json_obj, "durable");
+    if(tmpObj != NULL) {
+    	durable = json_object_get_int(tmpObj);
+    }
+
+    tmpObj = json_object_object_get(json_obj, "exclusive");
+    if(tmpObj != NULL) {
+    	exclusive = json_object_get_int(tmpObj);
+    }
+
+    tmpObj = json_object_object_get(json_obj, "auto_delete");
+    if(tmpObj != NULL) {
+    	auto_delete = json_object_get_int(tmpObj);
+    }
+
+    tmpObj = json_object_object_get(json_obj, "no_ack");
+    if(tmpObj != NULL) {
+    	no_ack = json_object_get_int(tmpObj);
+    }
+
+    tmpObj = json_object_object_get(json_obj, "wait_for_consumer_ack");
+    if(tmpObj != NULL) {
+    	wait_for_consumer_ack = json_object_get_int(tmpObj);
+    }
+
+
+	kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(&exchange_s, &exchange_type_s, &queue_s, &routing_key_s);
+	if(bind == NULL) {
+		LM_ERR("Could not allocate bind struct\n");
+		goto error;
+	}
+
+	bind->durable = durable;
+	bind->passive = passive;
+	bind->exclusive = exclusive;
+	bind->auto_delete = auto_delete;
+	bind->no_ack = no_ack;
+	bind->wait_for_consumer_ack = wait_for_consumer_ack;
+
+
+	kz_amqp_binding_ptr binding = shm_malloc(sizeof(kz_amqp_binding));
+	if(binding == NULL) {
+		LM_ERR("Could not allocate binding struct\n");
+		goto error;
+	}
+	memset(binding, 0, sizeof(kz_amqp_binding));
+
+	if(kz_bindings->head == NULL)
+		kz_bindings->head = binding;
+
+	if(kz_bindings->tail != NULL)
+		kz_bindings->tail->next = binding;
+
+	kz_bindings->tail = binding;
+	binding->bind = bind;
+
+    if(json_obj != NULL)
+       	json_object_put(json_obj);
+
+    return 1;
+
+error:
+    if(binding != NULL)
+    	shm_free(binding);
+
+    if(json_obj != NULL)
+       	json_object_put(json_obj);
+
+	return -1;
+
+}
+
+
+#define KEY_SAFE(C)  ((C >= 'a' && C <= 'z') || \
+                      (C >= 'A' && C <= 'Z') || \
+                      (C >= '0' && C <= '9') || \
+                      (C == '-' || C == '~'  || C == '_'))
+
+#define HI4(C) (C>>4)
+#define LO4(C) (C & 0x0F)
+
+#define hexint(C) (C < 10?('0' + C):('A'+ C - 10))
+
+char *kz_amqp_util_encode(const str * key, char *dest) {
+    if ((key->len == 1) && (key->s[0] == '#' || key->s[0] == '*')) {
+	*dest++ = key->s[0];
+	return dest;
+    }
+    char *p, *end;
+    for (p = key->s, end = key->s + key->len; p < end; p++) {
+	if (KEY_SAFE(*p)) {
+	    *dest++ = *p;
+	} else if (*p == '.') {
+	    memcpy(dest, "\%2E", 3);
+	    dest += 3;
+	} else if (*p == ' ') {
+	    *dest++ = '+';
+	} else {
+	    *dest++ = '%';
+	    sprintf(dest, "%c%c", hexint(HI4(*p)), hexint(LO4(*p)));
+	    dest += 2;
+	}
+    }
+    *dest = '\0';
+    return dest;
+}
+
+int kz_amqp_encode_ex(str* unencoded, pv_value_p dst_val)
+{
+	char routing_key_buff[256];
+	memset(routing_key_buff,0, sizeof(routing_key_buff));
+	kz_amqp_util_encode(unencoded, routing_key_buff);
+
+	int len = strlen(routing_key_buff);
+	dst_val->rs.s = pkg_malloc(len+1);
+	memcpy(dst_val->rs.s, routing_key_buff, len);
+	dst_val->rs.s[len] = '\0';
+	dst_val->rs.len = len;
+	dst_val->flags = PV_VAL_STR | PV_VAL_PKG;
+
+	return 1;
+
+}
+
+int kz_amqp_encode(struct sip_msg* msg, char* unencoded, char* encoded)
+{
+    str unencoded_s;
+	pv_spec_t *dst_pv;
+	pv_value_t dst_val;
+	dst_pv = (pv_spec_t *)encoded;
+
+	if (fixup_get_svalue(msg, (gparam_p)unencoded, &unencoded_s) != 0) {
+		LM_ERR("cannot get unencoded string value\n");
+		return -1;
+	}
+
+	kz_amqp_encode_ex(&unencoded_s, &dst_val);
+	dst_pv->setf(msg, &dst_pv->pvp, (int)EQ_T, &dst_val);
+
+	if(dst_val.flags & PV_VAL_PKG)
+		pkg_free(dst_val.rs.s);
+	else if(dst_val.flags & PV_VAL_SHM)
+		shm_free(dst_val.rs.s);
+
+
+	return 1;
+
+}
+
+int get_channel_index() {
+	int n;
+	for(n=channel_index; n < dbk_channels; n++)
+		if(channels[n].state == KZ_AMQP_FREE) {
+			channel_index = n+1;
+			return n;
+		}
+	if(channel_index == 0) {
+		LM_ERR("max channels (%d) reached. please exit kazoo and change db_kazoo amqp_max_channels param", dbk_channels);
+		return -1;
+	}
+	channel_index = 0;
+	return get_channel_index();
+}
+
+/*
+int kz_amqp_unbind_channel(kz_amqp_conn_ptr kz_conn, int idx )
+{
+    kz_amqp_bind_ptr reply = channels[idx].targeted;
+    int ret = 0;
+	if(reply == NULL) {
+		LM_ERR("unbinding channel NULL??\n");
+		ret = -1;
+		goto error;
+	}
+
+    if (amqp_basic_cancel(kz_conn->conn, channels[idx].channel, amqp_empty_bytes) < 0
+	    || kz_amqp_error("Canceling", amqp_get_rpc_reply(kz_conn->conn)))
+    {
+		ret = -RET_AMQP_ERROR;
+		goto error;
+    }
+
+    if (amqp_queue_unbind(kz_conn->conn, channels[idx].channel, reply->queue, reply->exchange, reply->routing_key, amqp_empty_table) < 0
+	    || kz_amqp_error("Unbinding queue", amqp_get_rpc_reply(kz_conn->conn)))
+    {
+		ret = -RET_AMQP_ERROR;
+		goto error;
+    }
+
+    amqp_queue_delete(kz_conn->conn, channels[idx].channel, reply->queue, 0, 0);
+
+    kz_amqp_free_binding(reply);
+    channels[idx].targeted = NULL;
+    channels[idx].state = KZ_AMQP_FREE;
+
+error:
+	return ret;
+}
+*/
+
+
+int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int loopcount, int idx )
+{
+    kz_amqp_bind_ptr bind = NULL;
+    amqp_queue_declare_ok_t *r = NULL;
+    str rpl_exch = str_init("targeted");
+    str rpl_exch_type = str_init("direct");
+    int ret = -1;
+    char serverid[512];
+
+    bind = (kz_amqp_bind_ptr)shm_malloc(sizeof(kz_amqp_bind));
+	if(bind == NULL) {
+		LM_ERR("error allocation memory for reply\n");
+		goto error;
+	}
+	memset(bind, 0, sizeof(kz_amqp_bind));
+
+	bind->exchange = kz_amqp_bytes_dup_from_str(&rpl_exch);
+	bind->exchange_type = kz_amqp_bytes_dup_from_str(&rpl_exch_type);
+
+    sprintf(serverid, "kamailio@%.*s-<%d-%d-%d>", dbk_node_hostname.len, dbk_node_hostname.s, my_pid(), loopcount, idx);
+    bind->queue = kz_amqp_bytes_dup_from_string(serverid);
+
+    sprintf(serverid, "kamailio@%.*s-<%d-%d>-targeted-%d", dbk_node_hostname.len, dbk_node_hostname.s, my_pid(), loopcount, idx);
+    bind->routing_key = kz_amqp_bytes_dup_from_string(serverid);
+
+
+    if (bind->exchange.bytes == NULL || bind->routing_key.bytes == NULL || bind->queue.bytes == NULL) {
+		LM_ERR("Out of memory allocating for exchange/routing_key\n");
+		goto error;
+    }
+
+    r = amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, 0, 0, 1, 1, amqp_empty_table);
+    if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
+    {
+		goto error;
+    }
+
+	amqp_exchange_declare(kz_conn->conn, channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, amqp_empty_table);
+    if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
+    {
+		ret = -RET_AMQP_ERROR;
+		goto error;
+    }
+
+    if (amqp_queue_bind(kz_conn->conn, channels[idx].channel, bind->queue, bind->exchange, bind->routing_key, amqp_empty_table) < 0
+	    || kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn)))
+    {
+		goto error;
+    }
+
+    if (amqp_basic_consume(kz_conn->conn, channels[idx].channel, bind->queue, amqp_empty_bytes, 0, 1, 1, amqp_empty_table) < 0
+	    || kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn)))
+    {
+		goto error;
+    }
+
+    channels[idx].targeted = bind;
+    return 0;
+ error:
+	kz_amqp_free_bind(bind);
+    return ret;
+}
+
+int kz_amqp_bind_targeted_channels(kz_amqp_conn_ptr kz_conn , int loopcount)
+{
+	int i, ret;
+	for(i = 0; i < dbk_channels; i++) {
+		ret = kz_amqp_bind_targeted_channel(kz_conn, loopcount, i);
+		if(ret != 0)
+			return ret;
+	}
+	return 0;
+}
+
+
+int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind)
+{
+    int ret = -1;
+
+    int	idx = get_channel_index();
+
+    amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, bind->passive, bind->durable, bind->exclusive, bind->auto_delete, amqp_empty_table);
+    if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
+    {
+		ret = -RET_AMQP_ERROR;
+		goto error;
+    }
+
+	amqp_exchange_declare(kz_conn->conn, channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, amqp_empty_table);
+    if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
+    {
+		ret = -RET_AMQP_ERROR;
+		goto error;
+    }
+
+    LM_DBG("QUEUE BIND\n");
+    if (amqp_queue_bind(kz_conn->conn, channels[idx].channel, bind->queue, bind->exchange, bind->routing_key, amqp_empty_table) < 0
+	    || kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn)))
+    {
+		ret = -RET_AMQP_ERROR;
+		goto error;
+    }
+
+    LM_DBG("BASIC CONSUME\n");
+    if (amqp_basic_consume(kz_conn->conn, channels[idx].channel, bind->queue, amqp_empty_bytes, 0, bind->no_ack, 0, amqp_empty_table) < 0
+	    || kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn)))
+    {
+		ret = -RET_AMQP_ERROR;
+		goto error;
+    }
+
+    channels[idx].state = KZ_AMQP_CONSUMING;
+	channels[idx].consumer = bind;
+    ret = idx;
+ error:
+
+    return ret;
+}
+
+
+int kz_amqp_send_ex(kz_amqp_conn_ptr kz_conn, kz_amqp_cmd_ptr cmd, kz_amqp_channel_state state, int idx)
+{
+	amqp_bytes_t exchange;
+	amqp_bytes_t routing_key;
+	amqp_bytes_t payload;
+	int ret = 1;
+    json_obj_ptr json_obj = NULL;
+
+	amqp_basic_properties_t props;
+	memset(&props, 0, sizeof(amqp_basic_properties_t));
+	props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG;
+	props.content_type = amqp_cstring_bytes("application/json");
+
+	if(idx == -1) {
+		idx = get_channel_index();
+		if(idx == -1) {
+			LM_ERR("Failed to get channel index to publish\n");
+			goto error;
+		}
+	}
+
+    exchange = amqp_bytes_malloc_dup(amqp_cstring_bytes(cmd->exchange));
+    routing_key = amqp_bytes_malloc_dup(amqp_cstring_bytes(cmd->routing_key));
+    payload = amqp_bytes_malloc_dup(amqp_cstring_bytes(cmd->payload));
+
+    json_obj = json_tokener_parse(cmd->payload);
+    if (is_error(json_obj))
+    {
+		LM_ERR("Error parsing json: %s\n",json_tokener_errors[-(unsigned long)json_obj]);
+		LM_ERR("%s\n", cmd->payload);
+		goto error;
+    }
+
+    if(json_object_object_get(json_obj, BLF_JSON_SERVERID) == NULL) {
+        json_object_object_add(json_obj, BLF_JSON_SERVERID, json_object_new_string((char*)channels[idx].targeted->routing_key.bytes));
+    	amqp_bytes_free(payload);
+        payload = amqp_bytes_malloc_dup(amqp_cstring_bytes((char*)json_object_to_json_string(json_obj)));
+    }
+
+
+	amqp_basic_publish(kz_conn->conn, channels[idx].channel, exchange, routing_key, 0, 0, &props, payload);
+
+	if ( kz_amqp_error("Publishing",  amqp_get_rpc_reply(kz_conn->conn)) ) {
+		LM_ERR("Failed to publish\n");
+		ret = -1;
+		goto error;
+	}
+	channels[idx].state = state;
+	channels[idx].cmd = cmd;
+
+	ret = idx;
+
+	error:
+
+	if(json_obj)
+    	json_object_put(json_obj);
+
+	if(exchange.bytes)
+		amqp_bytes_free(exchange);
+	if(routing_key.bytes)
+		amqp_bytes_free(routing_key);
+	if(payload.bytes)
+		amqp_bytes_free(payload);
+
+	return ret;
+}
+
+int kz_amqp_send(kz_amqp_conn_ptr kz_conn, kz_amqp_cmd_ptr cmd)
+{
+	return kz_amqp_send_ex(kz_conn, cmd, KZ_AMQP_PUBLISHING , -1);
+}
+
+
+int kz_amqp_send_receive_ex(kz_amqp_conn_ptr kz_conn, kz_amqp_cmd_ptr cmd, int idx )
+{
+//	int newidx = kz_amqp_bind_channel_ex(kz_conn, cmd, idx);
+//	if(newidx >= 0)
+//		return kz_amqp_send_ex(kz_conn, cmd, KZ_AMQP_CALLING, newidx);
+		return kz_amqp_send_ex(kz_conn, cmd, KZ_AMQP_CALLING, idx);
+//	return newidx;
+}
+
+int kz_amqp_send_receive(kz_amqp_conn_ptr kz_conn, kz_amqp_cmd_ptr cmd )
+{
+	return kz_amqp_send_receive_ex(kz_conn, cmd, -1 );
+}
+
+char* eventData = NULL;
+
+int kz_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param,	pv_value_t *res)
+{
+	return eventData == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, eventData);
+}
+
+int kz_amqp_consumer_fire_event(char *eventkey)
+{
+	struct sip_msg *fmsg;
+	struct run_act_ctx ctx;
+	int rtb, rt;
+
+	LM_DBG("searching event_route[%s]\n", eventkey);
+	rt = route_get(&event_rt, eventkey);
+	if (rt < 0 || event_rt.rlist[rt] == NULL)
+	{
+		LM_DBG("route %s does not exist\n", eventkey);
+		return -2;
+	}
+	LM_DBG("executing event_route[%s] (%d)\n", eventkey, rt);
+	if(faked_msg_init()<0)
+		return -2;
+	fmsg = faked_msg_next();
+	rtb = get_route_type();
+	set_route_type(REQUEST_ROUTE);
+	init_run_actions_ctx(&ctx);
+	run_top_route(event_rt.rlist[rt], fmsg, 0);
+	set_route_type(rtb);
+	return 0;
+
+}
+
+void kz_amqp_consumer_event(int child_no, char *payload)
+{
+    json_obj_ptr json_obj = NULL;
+    str ev_name = {0, 0}, ev_category = {0, 0};
+    char buffer[512];
+    char * p;
+
+    eventData = payload;
+
+    json_obj = json_tokener_parse(payload);
+    if (is_error(json_obj))
+    {
+		LM_ERR("Error parsing json: %s\n",json_tokener_errors[-(unsigned long)json_obj]);
+		LM_ERR("%s\n", payload);
+		return;
+    }
+
+    json_extract_field(dbk_consumer_event_key.s, ev_category);
+    json_extract_field(dbk_consumer_event_subkey.s, ev_name);
+
+    sprintf(buffer, "kazoo:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s);
+    for (p=buffer ; *p; ++p) *p = tolower(*p);
+    for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
+    if(kz_amqp_consumer_fire_event(buffer) != 0) {
+        sprintf(buffer, "kazoo:consumer-event-%.*s-%.*s",dbk_consumer_event_key.len, dbk_consumer_event_key.s, dbk_consumer_event_subkey.len, dbk_consumer_event_subkey.s);
+        for (p=buffer ; *p; ++p) *p = tolower(*p);
+        for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
+        if(kz_amqp_consumer_fire_event(buffer) != 0) {
+            sprintf(buffer, "kazoo:consumer-event");
+            if(kz_amqp_consumer_fire_event(buffer) != 0) {
+                LM_ERR("kazoo:consumer-event not found");
+            }
+        }
+    }
+	if(json_obj)
+    	json_object_put(json_obj);
+
+	eventData = NULL;
+}
+
+void kz_amqp_consumer_loop(int child_no)
+{
+
+	LM_DBG("starting consumer %d\n", child_no);
+	close(kz_pipe_fds[child_no*2+1]);
+	int data_pipe = kz_pipe_fds[child_no*2];
+	int back_idx = (dbk_consumer_processes+1)*2+1;
+
+	fd_set fdset;
+    int selret;
+
+    while(1) {
+    	FD_ZERO(&fdset);
+    	FD_SET(data_pipe, &fdset);
+
+    	selret = select(FD_SETSIZE, &fdset, NULL, NULL, NULL);
+
+    	if (selret < 0) {
+    		LM_ERR("select() failed: %s\n", strerror(errno));
+    	} else if (!selret) {
+    	} else {
+			if(FD_ISSET(data_pipe, &fdset)) {
+				kz_amqp_consumer_delivery_ptr ptr;
+				if(read(data_pipe, &ptr, sizeof(ptr)) == sizeof(ptr)) {
+					LM_DBG("consumer %d received payload %s\n", child_no, ptr->payload);
+					kz_amqp_consumer_event(child_no, ptr->payload);
+					if(ptr->channel > 0 && ptr->delivery_tag > 0) {
+						kz_amqp_cmd_ptr cmd = kz_amqp_alloc_pipe_cmd();
+						cmd->type = KZ_AMQP_ACK;
+						cmd->channel = ptr->channel;
+						cmd->delivery_tag = ptr->delivery_tag;
+						if (write(kz_pipe_fds[back_idx], &cmd, sizeof(cmd)) != sizeof(cmd)) {
+							LM_ERR("failed to send ack to AMQP Manager in process %d, write to command pipe: %s\n", getpid(), strerror(errno));
+						}
+					}
+					shm_free(ptr->payload);
+					shm_free(ptr);
+				}
+			}
+    	}
+	}
+	LM_DBG("exiting consumer %d\n", child_no);
+}
+
+int check_timeout(struct timeval *now, struct timeval *start, struct timeval *timeout)
+{
+	struct timeval chk;
+	chk.tv_sec = now->tv_sec - start->tv_sec;
+	chk.tv_usec = now->tv_usec - start->tv_usec;
+	if(chk.tv_usec >= timeout->tv_usec)
+		if(chk.tv_sec >= timeout->tv_sec)
+			return 1;
+	return 0;
+}
+
+int consumer = 1;
+
+void kz_amqp_send_consumer_event_ex(char* payload, amqp_channel_t channel, uint64_t delivery_tag, int nextConsumer)
+{
+	kz_amqp_consumer_delivery_ptr ptr = (kz_amqp_consumer_delivery_ptr) shm_malloc(sizeof(kz_amqp_consumer_delivery));
+	if(ptr == NULL) {
+		LM_ERR("NO MORE SHARED MEMORY!");
+		return;
+	}
+	memset(ptr, 0, sizeof(kz_amqp_consumer_delivery));
+	ptr->channel = channel;
+	ptr->delivery_tag = delivery_tag;
+	ptr->payload = payload;
+
+	if (write(kz_pipe_fds[consumer*2+1], &ptr, sizeof(ptr)) != sizeof(ptr)) {
+		LM_ERR("failed to send payload to consumer %d : %s\nPayload %s\n", consumer, strerror(errno), payload);
+	}
+
+	if(nextConsumer) {
+		consumer++;
+		if(consumer > dbk_consumer_processes) {
+			consumer = 1;
+		}
+	}
+}
+
+void kz_amqp_send_consumer_event(char* payload, int nextConsumer)
+{
+	kz_amqp_send_consumer_event_ex(payload, 0, 0, nextConsumer);
+}
+
+void kz_amqp_fire_connection_event(char *event, char* host)
+{
+	char* payload = (char*)shm_malloc(512);
+	sprintf(payload, "{ \"%.*s\" : \"connection\", \"%.*s\" : \"%s\", \"host\" : \"%s\" }",
+			dbk_consumer_event_key.len, dbk_consumer_event_key.s,
+			dbk_consumer_event_subkey.len, dbk_consumer_event_subkey.s,
+			event, host
+			);
+	kz_amqp_send_consumer_event(payload, 1);
+}
+
+void kz_amqp_manager_loop(int child_no)
+{
+	LM_DBG("starting manager %d\n", child_no);
+	close(kz_pipe_fds[child_no*2+1]);
+	close(kz_pipe_fds[(dbk_consumer_processes+1)*2+1]);
+	int data_pipe = kz_pipe_fds[child_no*2];
+	int back_pipe = kz_pipe_fds[(dbk_consumer_processes+1)*2];
+    fd_set fdset;
+    int i, idx;
+    int selret;
+	int INTERNAL_READ, CONSUME, ACK_READ,  OK;
+	int INTERNAL_READ_COUNT , INTERNAL_READ_MAX_LOOP;
+	int CONSUMER_READ_COUNT , CONSUMER_READ_MAX_LOOP;
+	int ACK_READ_COUNT , ACK_READ_MAX_LOOP;
+	char* payload;
+	int channel_res;
+    kz_amqp_conn_ptr kzconn;
+	kz_amqp_cmd_ptr cmd;
+    int loopcount = 0;
+    int firstLoop = dbk_consume_messages_on_reconnect;
+
+
+    while(1) {
+
+        INTERNAL_READ_MAX_LOOP = dbk_internal_loop_count;
+        CONSUMER_READ_MAX_LOOP = dbk_consumer_loop_count;
+        ACK_READ_MAX_LOOP = dbk_consumer_ack_loop_count;
+
+    	OK = 1;
+
+    	while(1) {
+    		kzconn = kz_amqp_get_next_connection();
+    		if(kzconn != NULL)
+    			break;
+    		LM_DBG("Connection failed : all servers down?");
+    		sleep(3);
+    	}
+
+    	kz_amqp_fire_connection_event("open", kzconn->info.host);
+
+    	loopcount++;
+    	for(i=0,channel_res=0; i < dbk_channels && channel_res == 0; i++) {
+    		/* start cleanup */
+    		channels[i].state = KZ_AMQP_CLOSED;
+    		channels[i].consumer = NULL;
+    		if(channels[i].targeted != NULL) {
+    			kz_amqp_free_bind(channels[i].targeted);
+    			channels[i].targeted = NULL;
+    		}
+    		cmd = channels[i].cmd;
+			if(cmd != NULL) {
+				channels[i].cmd = NULL;
+				cmd->return_code = -1;
+				lock_release(&cmd->lock);
+			}
+    		/* end cleanup */
+    		channel_res = kz_amqp_channel_open(kzconn, channels[i].channel);
+    		if(channel_res == 0) {
+    			kz_amqp_bind_targeted_channel(kzconn, loopcount, i);
+				channels[i].state = KZ_AMQP_FREE;
+    		}
+    	}
+    	channel_index = 0;
+    	/* bind consumers */
+    	if(kz_bindings != NULL) {
+    		kz_amqp_binding_ptr binding = kz_bindings->head;
+    		while(binding != NULL) {
+    			kz_amqp_bind_consumer(kzconn, binding->bind);
+    			binding = binding->next;
+    		}
+    	}
+
+    	firstLoop = dbk_consume_messages_on_reconnect;
+    	while(OK) {
+        	INTERNAL_READ = 1;
+    		CONSUME = 1;
+    		ACK_READ = 1;
+
+
+    		ACK_READ_COUNT = 0;
+        	while(ACK_READ && ACK_READ_COUNT < ACK_READ_MAX_LOOP) {
+        		ACK_READ_COUNT++;
+				FD_ZERO(&fdset);
+				FD_SET(back_pipe, &fdset);
+				selret = select(FD_SETSIZE, &fdset, NULL, NULL, &kz_ack_tv);
+				if (selret < 0) {
+					LM_ERR("select() failed: %s\n", strerror(errno));
+					break;
+				} else if (!selret) {
+					ACK_READ=0;
+				} else {
+					if(FD_ISSET(back_pipe, &fdset)) {
+						if(read(back_pipe, &cmd, sizeof(cmd)) == sizeof(cmd)) {
+							switch (cmd->type) {
+								case KZ_AMQP_ACK:
+									if(amqp_basic_ack(kzconn->conn, cmd->channel, cmd->delivery_tag, 0 ) < 0) {
+										LM_ERR("AMQP ERROR TRYING TO ACK A MSG RETURNED FROM CONSUMER\n");
+										OK = CONSUME = 0;
+									}
+									kz_amqp_free_pipe_cmd(cmd);
+									break;
+								default:
+									LM_DBG("unknown pipe cmd %d\n", cmd->type);
+									break;
+							}
+						}
+					}
+				}
+        	}
+    		INTERNAL_READ_COUNT = 0;
+        	while(INTERNAL_READ && INTERNAL_READ_COUNT < INTERNAL_READ_MAX_LOOP) {
+        		INTERNAL_READ_COUNT++;
+				FD_ZERO(&fdset);
+				FD_SET(data_pipe, &fdset);
+				selret = select(FD_SETSIZE, &fdset, NULL, NULL, &kz_sock_tv);
+				if (selret < 0) {
+					LM_ERR("select() failed: %s\n", strerror(errno));
+					break;
+				} else if (!selret) {
+					INTERNAL_READ=0;
+				} else {
+					if(FD_ISSET(data_pipe, &fdset)) {
+						if(read(data_pipe, &cmd, sizeof(cmd)) == sizeof(cmd)) {
+							switch (cmd->type) {
+							case KZ_AMQP_PUBLISH:
+								idx = kz_amqp_send(kzconn, cmd);
+								if(idx >= 0) {
+									cmd->return_code = AMQP_RESPONSE_NORMAL;
+								} else {
+									cmd->return_code = -1;
+									OK = INTERNAL_READ = CONSUME = 0;
+									LM_ERR("ERROR SENDING PUBLISH");
+								}
+								channels[idx].state = KZ_AMQP_FREE;
+								channels[idx].cmd = NULL;
+								lock_release(&cmd->lock);
+								break;
+							case KZ_AMQP_CALL:
+								idx = kz_amqp_send_receive(kzconn, cmd);
+								if(idx < 0) {
+									channels[idx].state = KZ_AMQP_FREE;
+									channels[idx].cmd = NULL;
+									cmd->return_code = -1;
+									lock_release(&cmd->lock);
+									LM_ERR("ERROR SENDING QUERY");
+									OK = INTERNAL_READ = CONSUME = 0;
+								} else {
+									gettimeofday(&channels[idx].timer, NULL);
+								}
+								break;
+							default:
+								LM_DBG("unknown pipe cmd %d\n", cmd->type);
+								break;
+							}
+						}
+					}
+				}
+        	}
+
+    		CONSUMER_READ_COUNT = 0;
+    	    while(CONSUME && (CONSUMER_READ_COUNT < CONSUMER_READ_MAX_LOOP || firstLoop)) {
+        		payload = NULL;
+        		CONSUMER_READ_COUNT++;
+				amqp_envelope_t envelope;
+				amqp_maybe_release_buffers(kzconn->conn);
+				amqp_rpc_reply_t reply = amqp_consume_message(kzconn->conn, &envelope, &kz_amqp_tv, 0);
+				switch(reply.reply_type) {
+				case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+					switch(reply.library_error) {
+					case AMQP_STATUS_HEARTBEAT_TIMEOUT:
+						LM_ERR("AMQP_STATUS_HEARTBEAT_TIMEOUT\n");
+						OK = CONSUME = 0;
+						break;
+					case AMQP_STATUS_TIMEOUT:
+						CONSUME = 0;
+						break;
+					case AMQP_STATUS_UNEXPECTED_STATE:
+						LM_DBG("AMQP_STATUS_UNEXPECTED_STATE\n");
+						OK = CONSUME = kz_amqp_consume_error(kzconn->conn);
+						break;
+					default:
+						OK = CONSUME = 0;
+						break;
+					};
+					break;
+
+				case AMQP_RESPONSE_NORMAL:
+					idx = envelope.channel-1;
+					switch(channels[idx].state) {
+					case KZ_AMQP_CALLING:
+						channels[idx].cmd->return_payload = kz_amqp_bytes_dup(envelope.message.body);
+						channels[idx].cmd->return_code = AMQP_RESPONSE_NORMAL;
+						lock_release(&channels[idx].cmd->lock);
+						channels[idx].state = KZ_AMQP_FREE;
+						channels[idx].cmd = NULL;
+						break;
+					case KZ_AMQP_CONSUMING:
+						kz_amqp_send_consumer_event_ex(kz_amqp_bytes_dup(envelope.message.body),
+								channels[idx].consumer->no_ack ? 0 : envelope.channel,
+								channels[idx].consumer->no_ack ? 0 : envelope.delivery_tag,
+								(firstLoop && dbk_single_consumer_on_reconnect) ? 0 : 1);
+						if(!channels[idx].consumer->no_ack ) {
+							if(channels[idx].consumer->wait_for_consumer_ack) {
+								LM_DBG("MSG ACK delayed until return from consumer\n");
+							} else {
+								if(amqp_basic_ack(kzconn->conn, envelope.channel, envelope.delivery_tag, 0 ) < 0) {
+									LM_ERR("AMQP ERROR TRYING TO ACK A MSG\n");
+									OK = CONSUME = 0;
+								}
+							}
+						}
+						break;
+					default:
+						break;
+					}
+					break;
+				case AMQP_RESPONSE_SERVER_EXCEPTION:
+					LM_ERR("AMQP_RESPONSE_SERVER_EXCEPTION in consume\n");
+					OK = CONSUME = 0;
+					break;
+
+				default:
+					LM_ERR("UNHANDLED AMQP_RESPONSE in consume\n");
+					OK = CONSUME = 0;
+					break;
+				};
+				amqp_destroy_envelope(&envelope);
+    	    }
+
+			/* check timeouts */
+			if(OK && (!firstLoop)) {
+				struct timeval now;
+				gettimeofday(&now, NULL);
+				for(i=0; i < dbk_channels; i++) {
+					if(channels[i].state == KZ_AMQP_CALLING
+							&& channels[i].cmd != NULL
+							&& check_timeout(&now, &channels[i].timer, &channels[i].cmd->timeout)) {
+						cmd = channels[i].cmd;
+						channels[i].state = KZ_AMQP_FREE;
+						channels[i].cmd = NULL;
+						cmd->return_code = -1;
+						lock_release(&cmd->lock);
+						// rebind ??
+						LM_ERR("QUERY TIMEOUT");
+					}
+				}
+			}
+			firstLoop = 0;
+    	}
+    	kz_amqp_connection_close(kzconn);
+    	kz_amqp_fire_connection_event("closed", kzconn->info.host);
+    }
+}

+ 189 - 0
modules/kazoo/kz_amqp.h

@@ -0,0 +1,189 @@
+/*
+ * kz_amqp.h
+ *
+ *  Created on: Jul 29, 2014
+ *      Author: root
+ */
+
+#ifndef KZ_AMQP_H_
+#define KZ_AMQP_H_
+
+#include <amqp.h>
+
+#include "../../sr_module.h"
+#include "../tm/tm_load.h"
+
+#include "const.h"
+#include "defs.h"
+#include "../../lib/kcore/faked_msg.h"
+
+typedef struct amqp_connection_info kz_amqp_connection_info;
+typedef kz_amqp_connection_info *kz_amqp_connection_info_ptr;
+
+extern int dbk_channels;
+extern str dbk_node_hostname;
+extern struct tm_binds tmb;
+extern str dbk_consumer_event_key;
+extern str dbk_consumer_event_subkey;
+extern int dbk_consumer_processes;
+
+
+typedef struct kz_amqp_conn_t {
+	kz_amqp_connection_info info;
+	amqp_connection_state_t conn;
+	amqp_socket_t *socket;
+	amqp_channel_t channel_count;
+	amqp_channel_t channel_counter;
+//    gen_lock_t lock;
+    struct kz_amqp_conn_t* next;
+} kz_amqp_conn, *kz_amqp_conn_ptr;
+
+typedef struct {
+	kz_amqp_conn_ptr current;
+	kz_amqp_conn_ptr head;
+	kz_amqp_conn_ptr tail;
+//    gen_lock_t lock;
+} kz_amqp_conn_pool, *kz_amqp_conn_pool_ptr;
+
+
+#define AMQP_KZ_CMD_PUBLISH       1
+#define AMQP_KZ_CMD_CALL          2
+#define AMQP_KZ_CMD_CONSUME       3
+
+typedef enum {
+	KZ_AMQP_PUBLISH     = 1,
+	KZ_AMQP_CALL    = 2,
+	KZ_AMQP_CONSUME = 3,
+	KZ_AMQP_ACK = 4
+} kz_amqp_pipe_cmd_type;
+
+typedef enum {
+	KZ_AMQP_CLOSED     = 0,
+	KZ_AMQP_FREE     = 1,
+	KZ_AMQP_PUBLISHING    = 2,
+	KZ_AMQP_BINDED = 3,
+	KZ_AMQP_CALLING    = 4,
+	KZ_AMQP_CONSUMING = 5
+} kz_amqp_channel_state;
+
+typedef struct {
+    gen_lock_t lock;
+	kz_amqp_pipe_cmd_type type;
+	char* exchange;
+	char* exchange_type;
+	char* routing_key;
+	char* reply_routing_key;
+	char* queue;
+	char* payload;
+	char* return_payload;
+	int   return_code;
+	int   consumer;
+	uint64_t delivery_tag;
+	amqp_channel_t channel;
+	struct timeval timeout;
+} kz_amqp_cmd, *kz_amqp_cmd_ptr;
+
+typedef struct {
+	char* payload;
+	uint64_t delivery_tag;
+	amqp_channel_t channel;
+} kz_amqp_consumer_delivery, *kz_amqp_consumer_delivery_ptr;
+
+typedef struct {
+	amqp_bytes_t exchange;
+	amqp_bytes_t exchange_type;
+	amqp_bytes_t routing_key;
+	amqp_bytes_t queue;
+	amqp_boolean_t passive;
+	amqp_boolean_t durable;
+	amqp_boolean_t exclusive;
+	amqp_boolean_t auto_delete;
+	amqp_boolean_t no_ack;
+	amqp_boolean_t wait_for_consumer_ack;
+} kz_amqp_bind, *kz_amqp_bind_ptr;
+
+typedef struct {
+	kz_amqp_cmd_ptr cmd;
+	kz_amqp_bind_ptr targeted;
+	kz_amqp_bind_ptr consumer;
+	amqp_channel_t channel;
+	kz_amqp_channel_state state;
+	struct timeval timer;
+} kz_amqp_channel, *kz_amqp_channel_ptr;
+
+typedef struct kz_amqp_binding_t {
+	kz_amqp_bind_ptr bind;
+    struct kz_amqp_binding_t* next;
+} kz_amqp_binding, *kz_amqp_binding_ptr;
+
+typedef struct {
+	kz_amqp_binding_ptr head;
+	kz_amqp_binding_ptr tail;
+} kz_amqp_bindings, *kz_amqp_bindings_ptr;
+
+void kz_amqp_init();
+void kz_amqp_destroy();
+int kz_amqp_add_connection(modparam_t type, void* val);
+
+int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload);
+int kz_amqp_query(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* dst);
+int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload);
+int kz_amqp_subscribe(struct sip_msg* msg, char* payload);
+int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange_type, char* queue_name, char* routing_key);
+int kz_amqp_encode(struct sip_msg* msg, char* unencoded, char* encoded);
+int kz_amqp_encode_ex(str* unencoded, pv_value_p dst_val);
+//void kz_amqp_presence_consumer_loop(int child_no);
+void kz_amqp_consumer_loop(int child_no);
+
+//void kz_amqp_generic_consumer_loop(int child_no);
+void kz_amqp_manager_loop(int child_no);
+
+int kz_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param,	pv_value_t *res);
+int kz_pv_get_last_query_result(struct sip_msg *msg, pv_param_t *param,	pv_value_t *res);
+int kz_pv_get_connection_host(struct sip_msg *msg, pv_param_t *param,	pv_value_t *res);
+
+static inline int kz_amqp_error(char const *context, amqp_rpc_reply_t x)
+{
+	amqp_connection_close_t *mconn;
+	amqp_channel_close_t *mchan;
+
+	switch (x.reply_type) {
+		case AMQP_RESPONSE_NORMAL:
+			return 0;
+
+		case AMQP_RESPONSE_NONE:
+			LM_ERR("%s: missing RPC reply type!", context);
+			break;
+
+		case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+			LM_ERR("%s: %s\n", context,  "(end-of-stream)");
+			break;
+
+		case AMQP_RESPONSE_SERVER_EXCEPTION:
+			switch (x.reply.id) {
+				case AMQP_CONNECTION_CLOSE_METHOD:
+					mconn = (amqp_connection_close_t *)x.reply.decoded;
+					LM_ERR("%s: server connection error %d, message: %.*s",
+							context, mconn->reply_code,
+							(int)mconn->reply_text.len,
+							(char *)mconn->reply_text.bytes);
+					break;
+				case AMQP_CHANNEL_CLOSE_METHOD:
+						mchan = (amqp_channel_close_t *)x.reply.decoded;
+					LM_ERR("%s: server channel error %d, message: %.*s",
+							context, mchan->reply_code,
+							(int)mchan->reply_text.len,
+							(char *)mchan->reply_text.bytes);
+					break;
+				default:
+					LM_ERR("%s: unknown server error, method id 0x%08X",
+							context, x.reply.id);
+					break;
+			}
+			break;
+	}
+	return -1;
+}
+
+
+#endif /* KZ_AMQP_H_ */

+ 137 - 0
modules/kazoo/kz_fixup.c

@@ -0,0 +1,137 @@
+/*
+ * kz_fixup.c
+ *
+ *  Created on: Aug 2, 2014
+ *      Author: root
+ */
+
+
+#include "../../mod_fix.h"
+#include "../../lvalue.h"
+
+#include "kz_fixup.h"
+
+int fixup_kz_json(void** param, int param_no)
+{
+  if (param_no == 1 || param_no == 2) {
+		return fixup_spve_null(param, 1);
+	}
+
+	if (param_no == 3) {
+		if (fixup_pvar_null(param, 1) != 0) {
+		    LM_ERR("failed to fixup result pvar\n");
+		    return -1;
+		}
+		if (((pv_spec_t *)(*param))->setf == NULL) {
+		    LM_ERR("result pvar is not writeble\n");
+		    return -1;
+		}
+		return 0;
+	}
+
+	LM_ERR("invalid parameter number <%d>\n", param_no);
+	return -1;
+}
+
+int fixup_kz_json_free(void** param, int param_no)
+{
+	if (param_no == 1 || param_no == 2) {
+		return fixup_free_spve_null(param, 1);
+	}
+
+	if (param_no == 3) {
+		return fixup_free_pvar_null(param, 1);
+	}
+
+	LM_ERR("invalid parameter number <%d>\n", param_no);
+	return -1;
+}
+
+
+int fixup_kz_amqp_encode(void** param, int param_no)
+{
+  if (param_no == 1 ) {
+		return fixup_spve_null(param, 1);
+	}
+
+	if (param_no == 2) {
+		if (fixup_pvar_null(param, 1) != 0) {
+		    LM_ERR("failed to fixup result pvar\n");
+		    return -1;
+		}
+		if (((pv_spec_t *)(*param))->setf == NULL) {
+		    LM_ERR("result pvar is not writeble\n");
+		    return -1;
+		}
+		return 0;
+	}
+
+	LM_ERR("invalid parameter number <%d>\n", param_no);
+	return -1;
+}
+
+int fixup_kz_amqp_encode_free(void** param, int param_no)
+{
+	if (param_no == 1 ) {
+		return fixup_free_spve_null(param, 1);
+	}
+
+	if (param_no == 2) {
+		return fixup_free_pvar_null(param, 1);
+	}
+
+	LM_ERR("invalid parameter number <%d>\n", param_no);
+	return -1;
+}
+
+
+
+int fixup_kz_amqp(void** param, int param_no)
+{
+  if (param_no == 1 || param_no == 2 || param_no == 3) {
+		return fixup_spve_null(param, 1);
+	}
+
+	if (param_no == 4) {
+		if (fixup_pvar_null(param, 1) != 0) {
+		    LM_ERR("failed to fixup result pvar\n");
+		    return -1;
+		}
+		if (((pv_spec_t *)(*param))->setf == NULL) {
+		    LM_ERR("result pvar is not writeble\n");
+		    return -1;
+		}
+		return 0;
+	}
+
+	LM_ERR("invalid parameter number <%d>\n", param_no);
+	return -1;
+}
+
+int fixup_kz_amqp_free(void** param, int param_no)
+{
+	if (param_no == 1 || param_no == 2 || param_no == 3) {
+		return fixup_free_spve_null(param, 1);
+	}
+
+	if (param_no == 4) {
+		return fixup_free_pvar_null(param, 1);
+	}
+
+	LM_ERR("invalid parameter number <%d>\n", param_no);
+	return -1;
+}
+
+
+int fixup_kz_amqp4(void** param, int param_no)
+{
+	return fixup_spve_str(param, 1);
+}
+
+int fixup_kz_amqp4_free(void** param, int param_no)
+{
+	return fixup_free_spve_str(param, 1);
+}
+
+
+

+ 25 - 0
modules/kazoo/kz_fixup.h

@@ -0,0 +1,25 @@
+/*
+ * kz_fixup.h
+ *
+ *  Created on: Aug 2, 2014
+ *      Author: root
+ */
+
+#ifndef KZ_FIXUP_H_
+#define KZ_FIXUP_H_
+
+int fixup_kz_amqp(void** param, int param_no);
+int fixup_kz_amqp_free(void** param, int param_no);
+
+int fixup_kz_amqp4(void** param, int param_no);
+int fixup_kz_amqp4_free(void** param, int param_no);
+
+int fixup_kz_json(void** param, int param_no);
+int fixup_kz_json_free(void** param, int param_no);
+
+int fixup_kz_amqp_encode(void** param, int param_no);
+int fixup_kz_amqp_encode_free(void** param, int param_no);
+
+
+
+#endif /* KZ_FIXUP_H_ */

+ 193 - 0
modules/kazoo/kz_json.c

@@ -0,0 +1,193 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2011 Flowroute LLC (flowroute.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <json/json.h>
+
+#include "../../mod_fix.h"
+#include "../../lvalue.h"
+
+#include "kz_json.h"
+
+
+char** str_split(char* a_str, const char a_delim)
+{
+    char** result    = 0;
+    size_t count     = 0;
+    char* tmp        = a_str;
+    char* last_comma = 0;
+    char delim[2];
+    delim[0] = a_delim;
+    delim[1] = 0;
+    int len = 0;
+
+    /* Count how many elements will be extracted. */
+    while (*tmp)
+    {
+        if (a_delim == *tmp)
+        {
+            count++;
+            last_comma = tmp;
+        }
+        tmp++;
+    }
+
+    /* Add space for trailing token. */
+    count += last_comma < (a_str + strlen(a_str) - 1);
+
+    /* Add space for terminating null string so caller
+       knows where the list of returned strings ends. */
+    count++;
+
+    result = pkg_malloc(sizeof(char*) * count);
+
+    if (result)
+    {
+        size_t idx  = 0;
+        char* token = strtok(a_str, delim);
+
+        while (token)
+        {
+            assert(idx < count);
+            len = strlen(token);
+            char* ptr = pkg_malloc( (len+1) * sizeof(char));
+            *(result + idx) = ptr;
+        	memcpy(ptr, token, len);
+        	ptr[len] = '\0';
+            token = strtok(0, delim);
+            idx++;
+        }
+        assert(idx == count - 1);
+        *(result + idx) = 0;
+    }
+
+    return result;
+}
+
+
+int kz_json_get_field_ex(str* json, str* field, pv_value_p dst_val)
+{
+  char** tokens;
+  char* dup;
+  char f1[25], f2[25];//, f3[25];
+  int i;
+
+  dup = pkg_malloc(json->len+1);
+  memcpy(dup, json->s, json->len);
+  dup[json->len] = '\0';
+  struct json_object *j = json_tokener_parse(dup);
+  pkg_free(dup);
+
+  if (is_error(j)) {
+	  LM_ERR("empty or invalid JSON\n");
+	  return -1;
+  }
+
+  struct json_object *jtree = NULL;
+
+  dup = pkg_malloc(field->len+1);
+  memcpy(dup, field->s, field->len);
+  dup[field->len] = '\0';
+  tokens = str_split(dup, '.');
+  pkg_free(dup);
+
+    if (tokens)
+    {
+    	jtree = j;
+        for (i = 0; *(tokens + i); i++)
+        {
+        	if(jtree != NULL) {
+				str field = str_init(*(tokens + i));
+				// check for idx []
+				int sresult = sscanf(field.s, "%[^[][%[^]]]", f1, f2); //, f3);
+				LM_DBG("CHECK IDX %d - %s , %s, %s\n", sresult, field.s, f1, f2);
+
+				jtree = json_object_object_get(jtree, f1);
+				if(jtree != NULL) {
+					char *value = (char*)json_object_get_string(jtree);
+					LM_DBG("JTREE OK %s\n", value);
+				}
+				if(jtree != NULL && sresult > 1 && json_object_is_type(jtree, json_type_array)) {
+					int idx = atoi(f2);
+					jtree = json_object_array_get_idx(jtree, idx);
+					if(jtree != NULL) {
+						char *value = (char*)json_object_get_string(jtree);
+						LM_DBG("JTREE IDX OK %s\n", value);
+					}
+				}
+        	}
+            pkg_free(*(tokens + i));
+        }
+        pkg_free(tokens);
+    }
+
+	if(jtree != NULL) {
+		char *value = (char*)json_object_get_string(jtree);
+		int len = strlen(value);
+		dst_val->rs.s = pkg_malloc(len+1);
+		memcpy(dst_val->rs.s, value, len);
+		dst_val->rs.s[len] = '\0';
+		dst_val->rs.len = len;
+		dst_val->flags = PV_VAL_STR | PV_VAL_PKG;
+	} else {
+		dst_val->flags = PV_VAL_NULL;
+	}
+
+	json_object_put(j);
+
+	return 1;
+}
+
+
+int kz_json_get_field(struct sip_msg* msg, char* json, char* field, char* dst)
+{
+  str json_s;
+  str field_s;
+  pv_spec_t *dst_pv;
+  pv_value_t dst_val;
+
+	if (fixup_get_svalue(msg, (gparam_p)json, &json_s) != 0) {
+		LM_ERR("cannot get json string value\n");
+		return -1;
+	}
+
+	if (fixup_get_svalue(msg, (gparam_p)field, &field_s) != 0) {
+		LM_ERR("cannot get field string value\n");
+		return -1;
+	}
+
+
+	if(kz_json_get_field_ex(&json_s, &field_s, &dst_val) != 1)
+		return -1;
+
+	dst_pv = (pv_spec_t *)dst;
+	dst_pv->setf(msg, &dst_pv->pvp, (int)EQ_T, &dst_val);
+	if(dst_val.flags & PV_VAL_PKG)
+		pkg_free(dst_val.rs.s);
+	else if(dst_val.flags & PV_VAL_SHM)
+		shm_free(dst_val.rs.s);
+
+	return 1;
+}

+ 19 - 0
modules/kazoo/kz_json.h

@@ -0,0 +1,19 @@
+/*
+ * kz_json.h
+ *
+ *  Created on: Aug 2, 2014
+ *      Author: root
+ */
+
+#ifndef KZ_JSON_H_
+#define KZ_JSON_H_
+
+#include "../../parser/msg_parser.h"
+
+
+int kz_json_get_field(struct sip_msg* msg, char* json, char* field, char* dst);
+int kz_json_get_field_ex(str* json, str* field, pv_value_p dst_val);
+
+
+
+#endif /* KZ_JSON_H_ */

+ 454 - 0
modules/kazoo/kz_pua.c

@@ -0,0 +1,454 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <json/json.h>
+#include "../../mem/mem.h"
+#include "../../timer_proc.h"
+#include "../../sr_module.h"
+#include "../../lib/kmi/mi.h"
+#include "../presence/bind_presence.h"
+#include "../../pvar.h"
+
+#include "../pua/pua.h"
+#include "../pua/pua_bind.h"
+#include "../pua/send_publish.h"
+
+#include "kz_pua.h"
+#include "defs.h"
+#include "const.h"
+
+
+extern int dbk_dialog_expires;
+extern int dbk_presence_expires;
+extern int dbk_mwi_expires;
+extern int dbk_include_entity;
+extern int dbk_pua_mode;
+
+extern db1_con_t *kz_pa_db;
+extern db_func_t kz_pa_dbf;
+extern str kz_presentity_table;
+
+
+int kz_pua_update_presentity(str* event, str* realm, str* user, str* etag, str* sender, str* body, int expires, int reset)
+{
+	db_key_t query_cols[12];
+	db_op_t  query_ops[12];
+	db_val_t query_vals[12];
+	int n_query_cols = 0;
+	int ret = -1;
+	int use_replace = 1;
+
+	query_cols[n_query_cols] = &str_event_col;
+	query_ops[n_query_cols] = OP_EQ;
+	query_vals[n_query_cols].type = DB1_STR;
+	query_vals[n_query_cols].nul = 0;
+	query_vals[n_query_cols].val.str_val = *event;
+	n_query_cols++;
+
+	query_cols[n_query_cols] = &str_domain_col;
+	query_ops[n_query_cols] = OP_EQ;
+	query_vals[n_query_cols].type = DB1_STR;
+	query_vals[n_query_cols].nul = 0;
+	query_vals[n_query_cols].val.str_val = *realm;
+	n_query_cols++;
+
+	query_cols[n_query_cols] = &str_username_col;
+	query_ops[n_query_cols] = OP_EQ;
+	query_vals[n_query_cols].type = DB1_STR;
+	query_vals[n_query_cols].nul = 0;
+	query_vals[n_query_cols].val.str_val = *user;
+	n_query_cols++;
+
+	query_cols[n_query_cols] = &str_etag_col;
+	query_ops[n_query_cols] = OP_EQ;
+	query_vals[n_query_cols].type = DB1_STR;
+	query_vals[n_query_cols].nul = 0;
+	query_vals[n_query_cols].val.str_val = *etag;
+	n_query_cols++;
+
+	query_cols[n_query_cols] = &str_sender_col;
+	query_vals[n_query_cols].type = DB1_STR;
+	query_vals[n_query_cols].nul = 0;
+	query_vals[n_query_cols].val.str_val = *sender;
+	n_query_cols++;
+
+	query_cols[n_query_cols] = &str_body_col;
+	query_vals[n_query_cols].type = DB1_BLOB;
+	query_vals[n_query_cols].nul = 0;
+	query_vals[n_query_cols].val.str_val = *body;
+	n_query_cols++;
+
+	query_cols[n_query_cols] = &str_received_time_col;
+	query_vals[n_query_cols].type = DB1_INT;
+	query_vals[n_query_cols].nul = 0;
+	query_vals[n_query_cols].val.int_val = (int)time(NULL);
+	n_query_cols++;
+
+	query_cols[n_query_cols] = &str_expires_col;
+	query_vals[n_query_cols].type = DB1_INT;
+	query_vals[n_query_cols].nul = 0;
+	query_vals[n_query_cols].val.int_val = expires+(int)time(NULL);
+	n_query_cols++;
+
+	if (kz_pa_dbf.use_table(kz_pa_db, &kz_presentity_table) < 0)
+	{
+		LM_ERR("unsuccessful use_table\n");
+		goto error;
+	}
+
+	if (kz_pa_dbf.replace == NULL || reset > 0)
+	{
+		use_replace = 0;
+		LM_DBG("using delete/insert instead of replace\n");
+	}
+
+	if (kz_pa_dbf.start_transaction)
+	{
+		if (kz_pa_dbf.start_transaction(kz_pa_db, DB_LOCKING_WRITE) < 0)
+		{
+			LM_ERR("in start_transaction\n");
+			goto error;
+		}
+	}
+
+	if(use_replace) {
+		if (kz_pa_dbf.replace(kz_pa_db, query_cols, query_vals, n_query_cols, 4, 0) < 0)
+		{
+			LM_ERR("replacing record in database\n");
+			if (kz_pa_dbf.abort_transaction)
+			{
+				if (kz_pa_dbf.abort_transaction(kz_pa_db) < 0)
+					LM_ERR("in abort_transaction\n");
+			}
+			goto error;
+		}
+	} else {
+		if (kz_pa_dbf.delete(kz_pa_db, query_cols, query_ops, query_vals, 4-reset) < 0)
+		{
+			LM_ERR("deleting record in database\n");
+			if (kz_pa_dbf.abort_transaction)
+			{
+				if (kz_pa_dbf.abort_transaction(kz_pa_db) < 0)
+					LM_ERR("in abort_transaction\n");
+			}
+			goto error;
+		}
+		if (kz_pa_dbf.insert(kz_pa_db, query_cols, query_vals, n_query_cols) < 0)
+		{
+			LM_ERR("replacing record in database\n");
+			if (kz_pa_dbf.abort_transaction)
+			{
+				if (kz_pa_dbf.abort_transaction(kz_pa_db) < 0)
+					LM_ERR("in abort_transaction\n");
+			}
+			goto error;
+		}
+	}
+
+	if (kz_pa_dbf.end_transaction)
+	{
+		if (kz_pa_dbf.end_transaction(kz_pa_db) < 0)
+		{
+			LM_ERR("in end_transaction\n");
+			goto error;
+		}
+	}
+
+error:
+
+	return ret;
+}
+
+
+int kz_pua_publish_presence_to_presentity(struct json_object *json_obj) {
+    int ret = 1;
+    str from = { 0, 0 }, to = { 0, 0 };
+    str from_user = { 0, 0 }, to_user = { 0, 0 };
+    str from_realm = { 0, 0 }, to_realm = { 0, 0 };
+    str callid = { 0, 0 }, fromtag = { 0, 0 }, totag = { 0, 0 };
+    str state = { 0, 0 };
+    str direction = { 0, 0 };
+    str event = str_init("presence");
+    str presence_body = { 0, 0 };
+    str activity = str_init("");
+    str note = str_init("Idle");
+    str status = str_presence_status_online;
+    int expires = dbk_presence_expires;
+
+    char *body = (char *)pkg_malloc(PRESENCE_BODY_BUFFER_SIZE);
+    if(body == NULL) {
+    	LM_ERR("Error allocating buffer for publish\n");
+    	ret = -1;
+    	goto error;
+    }
+
+    json_extract_field(BLF_JSON_FROM, from);
+    json_extract_field(BLF_JSON_FROM_USER, from_user);
+    json_extract_field(BLF_JSON_FROM_REALM, from_realm);
+    json_extract_field(BLF_JSON_TO, to);
+    json_extract_field(BLF_JSON_TO_USER, to_user);
+    json_extract_field(BLF_JSON_TO_REALM, to_realm);
+    json_extract_field(BLF_JSON_CALLID, callid);
+    json_extract_field(BLF_JSON_FROMTAG, fromtag);
+    json_extract_field(BLF_JSON_TOTAG, totag);
+    json_extract_field(BLF_JSON_DIRECTION, direction);
+    json_extract_field(BLF_JSON_STATE, state);
+
+    struct json_object* ExpiresObj = json_object_object_get(json_obj, BLF_JSON_EXPIRES);
+    if(ExpiresObj != NULL) {
+    	expires = json_object_get_int(ExpiresObj);
+    }
+
+    if (!from_user.len || !to_user.len || !state.len) {
+    	LM_ERR("missing one of From / To / State\n");
+    	goto error;
+    }
+
+    if (!strcmp(state.s, "early")) {
+    	note = str_presence_note_busy;
+    	activity = str_presence_act_busy;
+
+    } else if (!strcmp(state.s, "confirmed")) {
+    	note = str_presence_note_otp;
+    	activity = str_presence_act_otp;
+
+    } else if (!strcmp(state.s, "offline")) {
+    	note = str_presence_note_offline;
+    	status = str_presence_status_offline;
+
+    } else {
+    	note = str_presence_note_idle;
+    }
+
+
+    sprintf(body, PRESENCE_BODY, from_user.s, callid.s, status.s, note.s, activity.s, note.s);
+
+    presence_body.s = body;
+    presence_body.len = strlen(body);
+
+    if(dbk_pua_mode == 1) {
+    	kz_pua_update_presentity(&event, &from_realm, &from_user, &callid, &from, &presence_body, expires, 1);
+    }
+
+ error:
+
+ if(body)
+	  pkg_free(body);
+
+ return ret;
+
+}
+
+int kz_pua_publish_mwi_to_presentity(struct json_object *json_obj) {
+    int ret = 1;
+    str event = str_init("message-summary");
+    str from = { 0, 0 }, to = { 0, 0 };
+    str from_user = { 0, 0 }, to_user = { 0, 0 };
+    str from_realm = { 0, 0 }, to_realm = { 0, 0 };
+    str callid = { 0, 0 }, fromtag = { 0, 0 }, totag = { 0, 0 };
+    str mwi_user = { 0, 0 }, mwi_waiting = { 0, 0 },
+        mwi_new = { 0, 0 }, mwi_saved = { 0, 0 },
+        mwi_urgent = { 0, 0 }, mwi_urgent_saved = { 0, 0 },
+        mwi_account = { 0, 0 }, mwi_body = { 0, 0 };
+    int expires = dbk_mwi_expires;
+
+    char *body = (char *)pkg_malloc(MWI_BODY_BUFFER_SIZE);
+    if(body == NULL) {
+    	LM_ERR("Error allocating buffer for publish\n");
+    	ret = -1;
+    	goto error;
+    }
+
+    json_extract_field(BLF_JSON_FROM, from);
+    json_extract_field(BLF_JSON_FROM_USER, from_user);
+    json_extract_field(BLF_JSON_FROM_REALM, from_realm);
+    json_extract_field(BLF_JSON_TO, to);
+    json_extract_field(BLF_JSON_TO_USER, to_user);
+    json_extract_field(BLF_JSON_TO_REALM, to_realm);
+    json_extract_field(BLF_JSON_CALLID, callid);
+    json_extract_field(BLF_JSON_FROMTAG, fromtag);
+    json_extract_field(BLF_JSON_TOTAG, totag);
+
+    json_extract_field(MWI_JSON_TO, mwi_user);
+    json_extract_field(MWI_JSON_WAITING, mwi_waiting);
+    json_extract_field(MWI_JSON_NEW, mwi_new);
+    json_extract_field(MWI_JSON_SAVED, mwi_saved);
+    json_extract_field(MWI_JSON_URGENT, mwi_urgent);
+    json_extract_field(MWI_JSON_URGENT_SAVED, mwi_urgent_saved);
+    json_extract_field(MWI_JSON_ACCOUNT, mwi_account);
+
+    struct json_object* ExpiresObj = json_object_object_get(json_obj, BLF_JSON_EXPIRES);
+    if(ExpiresObj != NULL) {
+    	expires = json_object_get_int(ExpiresObj);
+    }
+
+    sprintf(body, MWI_BODY, mwi_waiting.len, mwi_waiting.s,
+	    mwi_account.len, mwi_account.s, mwi_new.len, mwi_new.s,
+	    mwi_saved.len, mwi_saved.s, mwi_urgent.len, mwi_urgent.s,
+	    mwi_urgent_saved.len, mwi_urgent_saved.s);
+
+    mwi_body.s = body;
+    mwi_body.len = strlen(body);
+
+    if(dbk_pua_mode == 1) {
+    	kz_pua_update_presentity(&event, &from_realm, &from_user, &callid, &from, &mwi_body, expires, 1);
+    }
+
+ error:
+
+   if(body)
+	  pkg_free(body);
+
+
+   return ret;
+}
+
+int kz_pua_publish_dialoginfo_to_presentity(struct json_object *json_obj) {
+    int ret = 1;
+    str from = { 0, 0 }, to = { 0, 0 };
+    str from_user = { 0, 0 }, to_user = { 0, 0 };
+    str from_realm = { 0, 0 }, to_realm = { 0, 0 };
+    str callid = { 0, 0 }, fromtag = { 0, 0 }, totag = { 0, 0 };
+    str state = { 0, 0 };
+    str direction = { 0, 0 };
+    char sender_buf[1024];
+    str sender = {0, 0};
+    str dialoginfo_body = {0 , 0};
+    int expires = dbk_dialog_expires;
+    str event = str_init("dialog");
+    int reset = 0;
+
+    char *body = (char *)pkg_malloc(DIALOGINFO_BODY_BUFFER_SIZE);
+    if(body == NULL) {
+    	LM_ERR("Error allocating buffer for publish\n");
+    	ret = -1;
+    	goto error;
+    }
+
+
+    json_extract_field(BLF_JSON_FROM, from);
+    json_extract_field(BLF_JSON_FROM_USER, from_user);
+    json_extract_field(BLF_JSON_FROM_REALM, from_realm);
+    json_extract_field(BLF_JSON_TO, to);
+    json_extract_field(BLF_JSON_TO_USER, to_user);
+    json_extract_field(BLF_JSON_TO_REALM, to_realm);
+    json_extract_field(BLF_JSON_CALLID, callid);
+    json_extract_field(BLF_JSON_FROMTAG, fromtag);
+    json_extract_field(BLF_JSON_TOTAG, totag);
+    json_extract_field(BLF_JSON_DIRECTION, direction);
+    json_extract_field(BLF_JSON_STATE, state);
+
+    struct json_object* ExpiresObj = json_object_object_get(json_obj, BLF_JSON_EXPIRES);
+    if(ExpiresObj != NULL) {
+    	expires = json_object_get_int(ExpiresObj);
+    }
+
+    ExpiresObj = json_object_object_get(json_obj, "Flush-Level");
+    if(ExpiresObj != NULL) {
+    	reset = json_object_get_int(ExpiresObj);
+    }
+
+    if (!from_user.len || !to_user.len || !state.len) {
+    	LM_ERR("missing one of From / To / State\n");
+		goto error;
+    }
+
+    if(callid.len) {
+
+    	if(dbk_include_entity) {
+        sprintf(body, DIALOGINFO_BODY,
+        		from.len, from.s,
+        		callid.len, callid.s,
+        		callid.len, callid.s,
+        		fromtag.len, fromtag.s,
+        		totag.len, totag.s,
+        		direction.len, direction.s,
+        		state.len, state.s,
+        		from_user.len, from_user.s,
+        		from.len, from.s,
+        		to_user.len, to_user.s,
+        		to.len, to.s
+        		);
+    	} else {
+
+        sprintf(body, DIALOGINFO_BODY_2,
+        		from.len, from.s,
+        		callid.len, callid.s,
+        		callid.len, callid.s,
+        		fromtag.len, fromtag.s,
+        		totag.len, totag.s,
+        		direction.len, direction.s,
+        		state.len, state.s,
+        		from_user.len, from_user.s,
+        		to_user.len, to_user.s
+        		);
+    	}
+
+    } else {
+    	sprintf(body, DIALOGINFO_EMPTY_BODY, from_user.len, from_user.s);
+    }
+
+    sprintf(sender_buf, "sip:%s",callid.s);
+    sender.s = sender_buf;
+    sender.len = strlen(sender_buf);
+
+    dialoginfo_body.s = body;
+    dialoginfo_body.len = strlen(body);
+
+    if(dbk_pua_mode == 1) {
+    	kz_pua_update_presentity(&event, &from_realm, &from_user, &callid, &sender, &dialoginfo_body, expires, reset);
+    }
+
+ error:
+
+   if(body)
+	  pkg_free(body);
+
+
+ return ret;
+
+}
+
+int kz_pua_publish(struct sip_msg* msg, char *json) {
+    str event_name = { 0, 0 }, event_package = { 0, 0 };
+    struct json_object *json_obj = NULL;
+    int ret = 1;
+
+    if(dbk_pua_mode != 1) {
+    	LM_ERR("pua_mode must be 1 to publish\n");
+    	ret = -1;
+    	goto error;
+    }
+
+    /* extract info from json and construct xml */
+    json_obj = json_tokener_parse(json);
+    if (is_error(json_obj)) {
+    	LM_ERR("Error parsing json: %s\n", json_tokener_errors[-(unsigned long)json_obj]);
+    	LM_ERR("%s\n", json);
+    	ret = -1;
+    	goto error;
+    }
+
+    json_extract_field(BLF_JSON_EVENT_NAME, event_name);
+
+    if (event_name.len == 6 && strncmp(event_name.s, "update", 6) == 0) {
+    	json_extract_field(BLF_JSON_EVENT_PKG, event_package);
+    	if (event_package.len == str_event_dialog.len
+    			&& strncmp(event_package.s, str_event_dialog.s, event_package.len) == 0) {
+    		ret = kz_pua_publish_dialoginfo_to_presentity(json_obj);
+    	} else if (event_package.len == str_event_message_summary.len
+    			&& strncmp(event_package.s, str_event_message_summary.s, event_package.len) == 0) {
+    		ret = kz_pua_publish_mwi_to_presentity(json_obj);
+    	} else if (event_package.len == str_event_presence.len
+    			&& strncmp(event_package.s, str_event_presence.s, event_package.len) == 0) {
+    		ret = kz_pua_publish_presence_to_presentity(json_obj);
+    	}
+    }
+
+error:
+	if(json_obj)
+		json_object_put(json_obj);
+
+	return ret;
+}
+

+ 9 - 0
modules/kazoo/kz_pua.h

@@ -0,0 +1,9 @@
+#ifndef _DBK_PUA_
+#define _DBK_PUA_
+
+
+int kz_initialize_pua();
+int kz_pua_publish(struct sip_msg* msg, char *json);
+
+
+#endif

+ 324 - 0
modules/kazoo/kz_trans.c

@@ -0,0 +1,324 @@
+/*
+ * $Id$
+ *
+ * Copyright (C) 2007 voice-system.ro
+ * Copyright (C) 2009 asipto.com
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+/*! \file
+ * \brief Support for transformations
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <time.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "../../dprint.h"
+#include "../../mem/mem.h"
+#include "../../ut.h" 
+#include "../../trim.h" 
+#include "../../pvapi.h"
+#include "../../dset.h"
+
+#include "../../parser/parse_param.h"
+#include "../../parser/parse_uri.h"
+#include "../../parser/parse_to.h"
+#include "../../parser/parse_nameaddr.h"
+
+#include "../../lib/kcore/strcommon.h"
+#include "kz_trans.h"
+#include "kz_json.h"
+#include "kz_amqp.h"
+
+
+
+/*! transformation buffer size */
+#define KZ_TR_BUFFER_SIZE 65536
+#define KZ_TR_BUFFER_SLOTS	8
+
+/*! transformation buffer */
+static char **_kz_tr_buffer_list = NULL;
+
+static char *_kz_tr_buffer = NULL;
+
+static int _kz_tr_buffer_idx = 0;
+
+/*!
+ *
+ */
+int kz_tr_init_buffers(void)
+{
+	int i;
+
+	_kz_tr_buffer_list = (char**)malloc(KZ_TR_BUFFER_SLOTS * sizeof(char*));
+	if(_kz_tr_buffer_list==NULL)
+		return -1;
+	for(i=0; i<KZ_TR_BUFFER_SLOTS; i++) {
+		_kz_tr_buffer_list[i] = (char*)malloc(KZ_TR_BUFFER_SIZE);
+		if(_kz_tr_buffer_list[i]==NULL)
+			return -1;
+	}
+	return 0;
+}
+
+/*!
+ *
+ */
+char *kz_tr_set_crt_buffer(void)
+{
+	_kz_tr_buffer = _kz_tr_buffer_list[_kz_tr_buffer_idx];
+	_kz_tr_buffer_idx = (_kz_tr_buffer_idx + 1) % KZ_TR_BUFFER_SLOTS;
+	return _kz_tr_buffer;
+}
+
+#define kz_tr_string_clone_result do { \
+		if(val->rs.len> KZ_TR_BUFFER_SIZE-1) { \
+			LM_ERR("result is too big\n"); \
+			return -1; \
+		} \
+		strncpy(_kz_tr_buffer, val->rs.s, val->rs.len); \
+		val->rs.s = _kz_tr_buffer; \
+	} while(0);
+
+/*!
+ * \brief Evaluate kazoo transformations
+ * \param msg SIP message
+ * \param tp transformation
+ * \param subtype transformation type
+ * \param val pseudo-variable
+ * \return 0 on success, -1 on error
+ */
+int kz_tr_eval(struct sip_msg *msg, tr_param_t *tp, int subtype, pv_value_t *val)
+{
+
+	if(val==NULL || (val->flags&PV_VAL_NULL))
+		return -1;
+
+	char* tofree = NULL;
+	int oldflags = 0;
+
+	kz_tr_set_crt_buffer();
+
+	switch(subtype)
+	{
+		case TR_KAZOO_ENCODE:
+			if(!(val->flags&PV_VAL_STR))
+				return -1;
+
+			oldflags = val->flags;
+			tofree = val->rs.s;
+
+			if( kz_amqp_encode_ex(&val->rs, val ) != 1) {
+				LM_ERR("error encoding value\n");
+				return -1;
+			}
+
+			// it seems that val memory is not freed
+			// event with flag set to PV_VAL_PKG
+
+			strncpy(_kz_tr_buffer, val->rs.s, val->rs.len);
+			if(val->flags & PV_VAL_PKG)
+				pkg_free(val->rs.s);
+			else if(val->flags & PV_VAL_SHM)
+				shm_free(val->rs.s);
+			_kz_tr_buffer[val->rs.len] = '\0';
+			val->flags = PV_VAL_STR;
+			val->ri = 0;
+			val->rs.s = _kz_tr_buffer;
+
+			if(oldflags & PV_VAL_PKG) {
+				pkg_free(tofree);
+			} else if(oldflags & PV_VAL_SHM) {
+				shm_free(tofree);
+			}
+
+
+			break;
+		case TR_KAZOO_JSON:
+			if(tp==NULL)
+			{
+				LM_ERR("kazoo json transform invalid parameter\n");
+				return -1;
+			}
+
+			oldflags = val->flags;
+			tofree = val->rs.s;
+
+			if(kz_json_get_field_ex(&val->rs, &tp->v.s, val ) != 1) {
+				LM_ERR("error getting json\n");
+				return -1;
+			}
+			// it seems that val memory is not freed
+			// event with flag set to PV_VAL_PKG
+
+			strncpy(_kz_tr_buffer, val->rs.s, val->rs.len);
+			if(val->flags & PV_VAL_PKG)
+				pkg_free(val->rs.s);
+			else if(val->flags & PV_VAL_SHM)
+				shm_free(val->rs.s);
+			_kz_tr_buffer[val->rs.len] = '\0';
+			val->flags = PV_VAL_STR;
+			val->ri = 0;
+			val->rs.s = _kz_tr_buffer;
+
+			if(oldflags & PV_VAL_PKG) {
+				pkg_free(tofree);
+			} else if(oldflags & PV_VAL_SHM) {
+				shm_free(tofree);
+			}
+
+			break;
+
+		default:
+			LM_ERR("unknown kazoo transformation subtype %d\n", subtype);
+			return -1;
+	}
+	return 0;
+}
+
+#define _kz_tr_parse_sparam(_p, _p0, _tp, _spec, _ps, _in, _s) \
+	while(is_in_str(_p, _in) && (*_p==' ' || *_p=='\t' || *_p=='\n')) _p++; \
+	if(*_p==PV_MARKER) \
+	{ /* pseudo-variable */ \
+		_spec = (pv_spec_t*)pkg_malloc(sizeof(pv_spec_t)); \
+		if(_spec==NULL) \
+		{ \
+			LM_ERR("no more private memory!\n"); \
+			goto error; \
+		} \
+		_s.s = _p; _s.len = _in->s + _in->len - _p; \
+		_p0 = pv_parse_spec(&_s, _spec); \
+		if(_p0==NULL) \
+		{ \
+			LM_ERR("invalid spec in substr transformation: %.*s!\n", \
+				_in->len, _in->s); \
+			goto error; \
+		} \
+		_p = _p0; \
+		_tp = (tr_param_t*)pkg_malloc(sizeof(tr_param_t)); \
+		if(_tp==NULL) \
+		{ \
+			LM_ERR("no more private memory!\n"); \
+			goto error; \
+		} \
+		memset(_tp, 0, sizeof(tr_param_t)); \
+		_tp->type = TR_PARAM_SPEC; \
+		_tp->v.data = (void*)_spec; \
+	} else { /* string */ \
+		_ps = _p; \
+		while(is_in_str(_p, _in) && *_p!='\t' && *_p!='\n' \
+				&& *_p!=TR_PARAM_MARKER && *_p!=TR_RBRACKET) \
+				_p++; \
+		if(*_p=='\0') \
+		{ \
+			LM_ERR("invalid param in transformation: %.*s!!\n", \
+				_in->len, _in->s); \
+			goto error; \
+		} \
+		_tp = (tr_param_t*)pkg_malloc(sizeof(tr_param_t)); \
+		if(_tp==NULL) \
+		{ \
+			LM_ERR("no more private memory!\n"); \
+			goto error; \
+		} \
+		memset(_tp, 0, sizeof(tr_param_t)); \
+		_tp->type = TR_PARAM_STRING; \
+		_tp->v.s.s = _ps; \
+		_tp->v.s.len = _p - _ps; \
+	}
+
+
+/*!
+ * \brief Helper fuction to parse a kazoo transformation
+ * \param in parsed string
+ * \param t transformation
+ * \return pointer to the end of the transformation in the string - '}', null on error
+ */
+char* kz_tr_parse(str* in, trans_t *t)
+{
+	char *p;
+	char *p0;
+	char *ps;
+	str name;
+	str s;
+	pv_spec_t *spec = NULL;
+	tr_param_t *tp = NULL;
+
+	if(in==NULL || t==NULL)
+		return NULL;
+
+	p = in->s;
+	name.s = in->s;
+	t->type = TR_KAZOO;
+	t->trf = kz_tr_eval;
+
+	/* find next token */
+	while(is_in_str(p, in) && *p!=TR_PARAM_MARKER && *p!=TR_RBRACKET) p++;
+	if(*p=='\0')
+	{
+		LM_ERR("invalid transformation: %.*s\n",
+				in->len, in->s);
+		goto error;
+	}
+	name.len = p - name.s;
+	trim(&name);
+
+	if(name.len==6 && strncasecmp(name.s, "encode", 6)==0)
+	{
+		t->subtype = TR_KAZOO_ENCODE;
+		goto done;
+	} else if(name.len==4 && strncasecmp(name.s, "json", 4)==0) {
+		t->subtype = TR_KAZOO_JSON;
+		if(*p!=TR_PARAM_MARKER)
+		{
+			LM_ERR("invalid json transformation: %.*s!\n", in->len, in->s);
+			goto error;
+		}
+		p++;
+		_kz_tr_parse_sparam(p, p0, tp, spec, ps, in, s);
+		t->params = tp;
+		tp = 0;
+		while(*p && (*p==' ' || *p=='\t' || *p=='\n')) p++;
+		if(*p!=TR_RBRACKET)
+		{
+			LM_ERR("invalid json transformation: %.*s!!\n",
+				in->len, in->s);
+			goto error;
+		}
+		goto done;
+	}
+
+	LM_ERR("unknown kazoo transformation: %.*s/%.*s/%d!\n", in->len, in->s,
+			name.len, name.s, name.len);
+error:
+	if(tp)
+		tr_param_free(tp);
+	if(spec)
+		pv_spec_free(spec);
+	return NULL;
+done:
+	t->name = name;
+	return p;
+}
+
+

+ 42 - 0
modules/kazoo/kz_trans.h

@@ -0,0 +1,42 @@
+/*
+ * $Id$
+ *
+ * Copyright (C) 2007 voice-system.ro
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+/*! \file
+ * \brief Transformations support
+ */
+
+#ifndef _KZ_TRANS_H_
+#define _KZ_TRANS_H_
+
+#include "../../pvar.h"
+
+
+
+enum _kz_tr_type { TR_NONE=0, TR_KAZOO };
+enum _kz_tr_subtype { TR_KAZOO_NONE=0, TR_KAZOO_ENCODE, TR_KAZOO_JSON };
+
+char* kz_tr_parse(str *in, trans_t *tr);
+
+int kz_tr_init_buffers(void);
+
+#endif