Przeglądaj źródła

Merge branch 'lazedo/kazoo'

Luis Azedo 11 lat temu
rodzic
commit
fd573c98c7

+ 40 - 46
modules/db_text/dbt_api.c

@@ -44,16 +44,16 @@ int dbt_use_table(db1_con_t* _h, const str* _t)
 /*
  * Get and convert columns from a result
  */
-static int dbt_get_columns(db1_con_t* _h, db1_res_t* _r)
+static int dbt_get_columns(db1_res_t* _r, dbt_result_p _dres)
 {
 	int col;
 	
-	if (!_h || !_r) {
+	if (!_r || !_dres) {
 		LM_ERR("invalid parameter\n");
 		return -1;
 	}
 	
-	RES_COL_N(_r) = DBT_CON_RESULT(_h)->nrcols;
+	RES_COL_N(_r) = _dres->nrcols;
 	if (!RES_COL_N(_r)) {
 		LM_ERR("no columns\n");
 		return -2;
@@ -78,10 +78,10 @@ static int dbt_get_columns(db1_con_t* _h, db1_res_t* _r)
 		LM_DBG("allocate %d bytes for RES_NAMES[%d] at %p",
 				(int)sizeof(str), col,
 				RES_NAMES(_r)[col]);
-		RES_NAMES(_r)[col]->s = DBT_CON_RESULT(_h)->colv[col].name.s;
-		RES_NAMES(_r)[col]->len = DBT_CON_RESULT(_h)->colv[col].name.len;
+		RES_NAMES(_r)[col]->s = _dres->colv[col].name.s;
+		RES_NAMES(_r)[col]->len = _dres->colv[col].name.len;
 
-		switch(DBT_CON_RESULT(_h)->colv[col].type)
+		switch(_dres->colv[col].type)
 		{
 			case DB1_STR:
 			case DB1_STRING:
@@ -89,12 +89,12 @@ static int dbt_get_columns(db1_con_t* _h, db1_res_t* _r)
 			case DB1_INT:
 			case DB1_DATETIME:
 			case DB1_DOUBLE:
-				RES_TYPES(_r)[col] = DBT_CON_RESULT(_h)->colv[col].type;
+				RES_TYPES(_r)[col] = _dres->colv[col].type;
 			break;
 			default:
 				LM_WARN("unhandled data type column (%.*s) type id (%d), "
 						"use STR as default\n", RES_NAMES(_r)[col]->len,
-						RES_NAMES(_r)[col]->s, DBT_CON_RESULT(_h)->colv[col].type);
+						RES_NAMES(_r)[col]->s, _dres->colv[col].type);
 				RES_TYPES(_r)[col] = DB1_STR;
 			break;
 		}
@@ -105,10 +105,10 @@ static int dbt_get_columns(db1_con_t* _h, db1_res_t* _r)
 /*
  * Convert a row from result into db API representation
  */
-static int dbt_convert_row(db1_con_t* _h, db1_res_t* _res, db_row_t* _r)
+static int dbt_convert_row(db1_res_t* _res, db_row_t* _r, dbt_row_p _r1)
 {
 	int i;
-	if (!_h || !_r || !_res) {
+	if (!_r || !_res || !_r1) {
 		LM_ERR("invalid parameter value\n");
 		return -1;
 	}
@@ -119,12 +119,12 @@ static int dbt_convert_row(db1_con_t* _h, db1_res_t* _res, db_row_t* _r)
 	}
 
 	for(i = 0; i < RES_COL_N(_res); i++) {
-		(ROW_VALUES(_r)[i]).nul = DBT_CON_ROW(_h)->fields[i].nul;
+		(ROW_VALUES(_r)[i]).nul = _r1->fields[i].nul;
 		switch(RES_TYPES(_res)[i])
 		{
 			case DB1_INT:
 				VAL_INT(&(ROW_VALUES(_r)[i])) = 
-						DBT_CON_ROW(_h)->fields[i].val.int_val;
+						_r1->fields[i].val.int_val;
 				VAL_TYPE(&(ROW_VALUES(_r)[i])) = DB1_INT;
 			break;
 
@@ -134,46 +134,46 @@ static int dbt_convert_row(db1_con_t* _h, db1_res_t* _res, db_row_t* _r)
 
 			case DB1_DOUBLE:
 				VAL_DOUBLE(&(ROW_VALUES(_r)[i])) = 
-						DBT_CON_ROW(_h)->fields[i].val.double_val;
+						_r1->fields[i].val.double_val;
 				VAL_TYPE(&(ROW_VALUES(_r)[i])) = DB1_DOUBLE;
 			break;
 
 			case DB1_STRING:
 				VAL_STR(&(ROW_VALUES(_r)[i])).s = 
-						DBT_CON_ROW(_h)->fields[i].val.str_val.s;
+						_r1->fields[i].val.str_val.s;
 				VAL_STR(&(ROW_VALUES(_r)[i])).len =
-						DBT_CON_ROW(_h)->fields[i].val.str_val.len;
+						_r1->fields[i].val.str_val.len;
 				VAL_TYPE(&(ROW_VALUES(_r)[i])) = DB1_STRING;
 				VAL_FREE(&(ROW_VALUES(_r)[i])) = 0;
 			break;
 
 			case DB1_STR:
 				VAL_STR(&(ROW_VALUES(_r)[i])).s = 
-						DBT_CON_ROW(_h)->fields[i].val.str_val.s;
+						_r1->fields[i].val.str_val.s;
 				VAL_STR(&(ROW_VALUES(_r)[i])).len =
-						DBT_CON_ROW(_h)->fields[i].val.str_val.len;
+						_r1->fields[i].val.str_val.len;
 				VAL_TYPE(&(ROW_VALUES(_r)[i])) = DB1_STR;
 				VAL_FREE(&(ROW_VALUES(_r)[i])) = 0;
 			break;
 
 			case DB1_DATETIME:
 				VAL_INT(&(ROW_VALUES(_r)[i])) = 
-						DBT_CON_ROW(_h)->fields[i].val.int_val;
+						_r1->fields[i].val.int_val;
 				VAL_TYPE(&(ROW_VALUES(_r)[i])) = DB1_DATETIME;
 			break;
 
 			case DB1_BLOB:
 				VAL_STR(&(ROW_VALUES(_r)[i])).s =
-						DBT_CON_ROW(_h)->fields[i].val.str_val.s;
+						_r1->fields[i].val.str_val.s;
 				VAL_STR(&(ROW_VALUES(_r)[i])).len =
-						DBT_CON_ROW(_h)->fields[i].val.str_val.len;
+						_r1->fields[i].val.str_val.len;
 				VAL_TYPE(&(ROW_VALUES(_r)[i])) = DB1_BLOB;
 				VAL_FREE(&(ROW_VALUES(_r)[i])) = 0;
 			break;
 
 			case DB1_BITMAP:
 				VAL_INT(&(ROW_VALUES(_r)[i])) =
-					DBT_CON_ROW(_h)->fields[i].val.bitmap_val;
+						_r1->fields[i].val.bitmap_val;
 				VAL_TYPE(&(ROW_VALUES(_r)[i])) = DB1_INT;
 			break;
 
@@ -189,15 +189,15 @@ static int dbt_convert_row(db1_con_t* _h, db1_res_t* _res, db_row_t* _r)
 /*
  * Convert rows from internal to db API representation
  */
-static int dbt_convert_rows(db1_con_t* _h, db1_res_t* _r)
+static int dbt_convert_rows(db1_res_t* _r, dbt_result_p _dres)
 {
-	int col;
+	int row;
 	dbt_row_p _rp = NULL;
-	if (!_h || !_r) {
+	if (!_r || !_dres) {
 		LM_ERR("invalid parameter\n");
 		return -1;
 	}
-	RES_ROW_N(_r) = DBT_CON_RESULT(_h)->nrrows;
+	RES_ROW_N(_r) = _dres->nrrows;
 	if (!RES_ROW_N(_r)) {
 		return 0;
 	}
@@ -205,23 +205,16 @@ static int dbt_convert_rows(db1_con_t* _h, db1_res_t* _r)
 		LM_ERR("could not allocate rows");
 		return -2;
 	}
-	col = 0;
-	_rp = DBT_CON_RESULT(_h)->rows;
+	row = 0;
+	_rp = _dres->rows;
 	while(_rp) {
-		DBT_CON_ROW(_h) = _rp;
-		if (!DBT_CON_ROW(_h)) {
-			LM_ERR("failed to get current row\n");
-			RES_ROW_N(_r) = col;
-			db_free_rows(_r);
-			return -3;
-		}
-		if (dbt_convert_row(_h, _r, &(RES_ROWS(_r)[col])) < 0) {
-			LM_ERR("failed to convert row #%d\n", col);
-			RES_ROW_N(_r) = col;
+		if (dbt_convert_row(_r, &(RES_ROWS(_r)[row]), _rp) < 0) {
+			LM_ERR("failed to convert row #%d\n", row);
+			RES_ROW_N(_r) = row;
 			db_free_rows(_r);
 			return -4;
 		}
-		col++;
+		row++;
 		_rp = _rp->next;
 	}
 	return 0;
@@ -231,18 +224,18 @@ static int dbt_convert_rows(db1_con_t* _h, db1_res_t* _r)
 /*
  * Fill the structure with data from database
  */
-static int dbt_convert_result(db1_con_t* _h, db1_res_t* _r)
+static int dbt_convert_result(db1_res_t* _r, dbt_result_p _dres)
 {
-	if (!_h || !_r) {
+	if (!_r || !_dres) {
 		LM_ERR("invalid parameter\n");
 		return -1;
 	}
-	if (dbt_get_columns(_h, _r) < 0) {
+	if (dbt_get_columns(_r, _dres) < 0) {
 		LM_ERR("failed to get column names\n");
 		return -2;
 	}
 
-	if (dbt_convert_rows(_h, _r) < 0) {
+	if (dbt_convert_rows(_r, _dres) < 0) {
 		LM_ERR("failed to convert rows\n");
 		db_free_columns(_r);
 		return -3;
@@ -253,14 +246,14 @@ static int dbt_convert_result(db1_con_t* _h, db1_res_t* _r)
 /*
  * Retrieve result set
  */
-int dbt_get_result(db1_con_t* _h, db1_res_t** _r)
+int dbt_get_result(db1_res_t** _r, dbt_result_p _dres)
 {
-	if (!_h || !_r) {
+	if ( !_r) {
 		LM_ERR("invalid parameter value\n");
 		return -1;
 	}
 
-	if (!DBT_CON_RESULT(_h))
+	if (!_dres)
 	{
 		LM_ERR("failed to get result\n");
 		*_r = 0;
@@ -274,12 +267,13 @@ int dbt_get_result(db1_con_t* _h, db1_res_t** _r)
 		return -2;
 	}
 
-	if (dbt_convert_result(_h, *_r) < 0) 
+	if (dbt_convert_result(*_r, _dres) < 0)
 	{
 		LM_ERR("failed to convert result\n");
 		pkg_free(*_r);
 		return -4;
 	}
 	
+	(*_r)->ptr = _dres;
 	return 0;
 }

+ 2 - 1
modules/db_text/dbt_api.h

@@ -36,10 +36,11 @@
 #include "../../lib/srdb1/db_con.h"
 #include "../../lib/srdb1/db_row.h"
 
+#include "dbt_res.h"
 /*
  * Retrieve result set
  */
-int dbt_get_result(db1_con_t* _h, db1_res_t** _r);
+int dbt_get_result(db1_res_t** _r, dbt_result_p _dres);
 
 int dbt_use_table(db1_con_t* _h, const str* _t);
 

+ 14 - 13
modules/db_text/dbt_base.c

@@ -119,9 +119,6 @@ void dbt_close(db1_con_t* _h)
 		return;
 	}
 	
-	if (DBT_CON_RESULT(_h)) 
-		dbt_result_free(DBT_CON_RESULT(_h));
-	
 	pkg_free(_h);
     return;
 }
@@ -138,19 +135,18 @@ int dbt_free_result(db1_con_t* _h, db1_res_t* _r)
 		return -1;
 	}
 
-	if(db_free_result(_r) < 0) 
+
+	if(dbt_result_free((dbt_result_p)_r->ptr) < 0)
 	{
-		LM_ERR("unable to free result structure\n");
-		return -1;
+		LM_ERR("unable to free internal structure\n");
 	}
 
-	
-	if(dbt_result_free(DBT_CON_RESULT(_h)) < 0) 
+	if(db_free_result(_r) < 0) 
 	{
-		LM_ERR("unable to free internal structure\n");
+		LM_ERR("unable to free result structure\n");
 		return -1;
 	}
-	DBT_CON_RESULT(_h) = NULL;
+
 	return 0;
 }
 
@@ -173,6 +169,7 @@ int dbt_query(db1_con_t* _h, db_key_t* _k, db_op_t* _op, db_val_t* _v,
 	dbt_table_p _tbc = NULL;
 	dbt_row_p _drp = NULL;
 	dbt_result_p _dres = NULL;
+	int result = 0;
 	
 	int *lkey=NULL, *lres=NULL;
 	
@@ -273,8 +270,6 @@ int dbt_query(db1_con_t* _h, db_key_t* _k, db_op_t* _op, db_val_t* _v,
 
 	/* dbt_result_print(_dres); */
 	
-	DBT_CON_RESULT(_h) = _dres;
-	
 	if(lkey)
 		pkg_free(lkey);
 	if(lres)
@@ -286,7 +281,11 @@ int dbt_query(db1_con_t* _h, db_key_t* _k, db_op_t* _op, db_val_t* _v,
  	if(_o_l)
  		pkg_free(_o_l);
 
-	return dbt_get_result(_h, _r);
+	result = dbt_get_result(_r, _dres);
+	if(result != 0)
+		dbt_result_free(_dres);
+
+	return result;
 
 error:
 	/* unlock database */
@@ -302,6 +301,8 @@ error_nounlock:
 		pkg_free(_o_op);
 	if(_o_l)
 		pkg_free(_o_l);
+	if(_dres)
+		dbt_result_free(_dres);
 	LM_ERR("failed to query the table!\n");
 
 	return -1;

+ 3 - 1
modules/db_text/dbt_res.c

@@ -119,7 +119,9 @@ int dbt_result_free(dbt_result_p _dres)
 			for(i=0; i<_dres->nrcols; i++)
 			{
 				if((_dres->colv[i].type==DB1_STR 
-							|| _dres->colv[i].type==DB1_STRING)
+							|| _dres->colv[i].type==DB1_STRING
+							|| _dres->colv[i].type==DB1_BLOB
+							)
 						&& _rp0->fields[i].val.str_val.s)
 					pkg_free(_rp0->fields[i].val.str_val.s);
 			}

+ 0 - 6
modules/db_text/dbt_res.h

@@ -44,18 +44,12 @@ typedef struct _dbt_result
 	dbt_row_p rows;
 } dbt_result_t, *dbt_result_p;
 
-//typedef db1_res_t dbt_result_t, *dbt_result_p;
-
 typedef struct _dbt_con
 {
 	dbt_cache_p con;
-	dbt_result_p res;
-	dbt_row_p row;
 } dbt_con_t, *dbt_con_p;
 
 #define DBT_CON_CONNECTION(db_con) (((dbt_con_p)((db_con)->tail))->con)
-#define DBT_CON_RESULT(db_con)     (((dbt_con_p)((db_con)->tail))->res)
-#define DBT_CON_ROW(db_con)        (((dbt_con_p)((db_con)->tail))->row)
 
 dbt_result_p dbt_result_new(dbt_table_p, int*, int);
 int dbt_result_free(dbt_result_p);

+ 2 - 5
modules/kazoo/Makefile

@@ -6,14 +6,11 @@ include ../../Makefile.defs
 auto_gen=
 NAME=kazoo.so
 
-DEFS+=-I/usr/local/include -DKAMAILIO_MOD_INTERFACE
-LIBS=-L/usr/local/lib -lrabbitmq -ljson
+DEFS+=-I$(LOCALBASE)/include -I$(SYSBASE)/include -DKAMAILIO_MOD_INTERFACE
+LIBS=-L$(LOCALBASE)/lib -I$(SYSBASE)/lib -lrabbitmq -ljson -luuid
 
 DEFS += -DSER_MOD_INTERFACE
 
-ifeq ($(CROSS_COMPILE),)
-XML2CFG=$(shell which xml2-config)
-endif
 
 SERLIBPATH=../../lib
 SER_LIBS=$(SERLIBPATH)/srdb2/srdb2 $(SERLIBPATH)/srdb1/srdb1

+ 795 - 2
modules/kazoo/README

@@ -1,6 +1,6 @@
 KAZOO Module
 
-2600hz
+2600hz Inc.
 
    <[email protected]>
 
@@ -10,8 +10,801 @@ Luis Azedo
 
    <[email protected]>
 
+   Copyright © 2010, 2014 2600hz
      __________________________________________________________________
 
    Table of Contents
 
-Documentation to come in the next days
+   1. Admin Guide
+
+        1. Overview
+        2. How it works
+
+              2.1. event routes
+              2.2. aknowledge messages
+
+        3. Dependencies
+
+              3.1. Kamailio Modules
+              3.2. External Libraries or Applications
+
+        4. Parameters
+
+              4.1. amqp related
+
+                    4.1.1. node_hostname(str)
+                    4.1.2. amqp_consumer_processes(int)
+                    4.1.3. amqp_consumer_event_key(str)
+                    4.1.4. amqp_consumer_event_subkey(str)
+                    4.1.5. amqp_max_channels(str)
+                    4.1.6. amqp_connection(str)
+
+              4.2. execution control
+
+                    4.2.1. amqp_consumer_loop_count(int)
+                    4.2.2. amqp_internal_loop_count(int)
+                    4.2.3. amqp_consumer_ack_loop_count(int)
+                    4.2.4. consume_messages_on_reconnect(int)
+                    4.2.5. single_consumer_on_reconnect(int)
+
+              4.3. timers
+
+                    4.3.1. amqp_consumer_ack_timeout(str)
+                    4.3.2. amqp_interprocess_timeout(str)
+                    4.3.3. amqp_waitframe_timout(str)
+                    4.3.4. amqp_query_timout(str)
+
+              4.4. presence related
+
+                    4.4.1. db_url(str)
+                    4.4.2. presentity_table(str)
+                    4.4.3. dialog_expires(str)
+                    4.4.4. presence_expires(str)
+                    4.4.5. mwi_expires(str)
+
+        5. Functions
+
+              5.1. amqp related
+
+                    5.1.1. kazoo_publish(exchange, routing_key,
+                            json_payload)
+
+                    5.1.2. kazoo_query(exchange, routing_key, json_payload
+                            [, target_var])
+
+                    5.1.3. kazoo_subscribe(exchange, exchange_type, queue,
+                            routing_key)
+
+                    5.1.4. kazoo_subscribe(json_description)
+
+              5.2. presence related
+
+                    5.2.1. kazoo_pua_publish(json_payload)
+
+              5.3. other
+
+                    5.3.1. kazoo_encode(to_encode, target_var)
+                    5.3.2. kazoo_json(json_payload, field, target_var)
+
+        6. Exported pseudo-variables
+        7. Transformations
+
+   List of Examples
+
+   1.1. define the event route
+   1.2. Set node_hostname parameter
+   1.3. Set amqp_consumer_processes parameter
+   1.4. Set amqp_consumer_event_key parameter
+   1.5. Set amqp_consumer_event_subkey parameter
+   1.6. Set amqp_max_channels parameter
+   1.7. Set amqp_connection parameter
+   1.8. Set amqp_consumer_loop_count parameter
+   1.9. Set amqp_internal_loop_count parameter
+   1.10. Set amqp_consumer_ack_loop_count parameter
+   1.11. Set consume_messages_on_reconnect parameter
+   1.12. Set single_consumer_on_reconnect parameter
+   1.13. Set amqp_consumer_ack_timeout parameter
+   1.14. Set amqp_interprocess_timeout parameter
+   1.15. Set amqp_waitframe_timout parameter
+   1.16. Set amqp_query_timout parameter
+   1.17. Set db_url parameter
+   1.18. Set presentity_table parameter
+   1.19. Set dialog_expires parameter
+   1.20. Set presence_expires parameter
+   1.21. Set mwi_expires parameter
+   1.22. kazoo_publish usage
+   1.23. kazoo_query usage
+   1.24. kazoo_subscribe usage
+   1.25. kazoo_subscribe usage
+   1.26. kazoo_pua_publish usage
+   1.27. kazoo_encode usage
+   1.28. kazoo_json usage
+   1.29. kz.json usage
+   1.30. kz.encode usage
+
+Chapter 1. Admin Guide
+
+   Table of Contents
+
+   1. Overview
+   2. How it works
+
+        2.1. event routes
+        2.2. aknowledge messages
+
+   3. Dependencies
+
+        3.1. Kamailio Modules
+        3.2. External Libraries or Applications
+
+   4. Parameters
+
+        4.1. amqp related
+
+              4.1.1. node_hostname(str)
+              4.1.2. amqp_consumer_processes(int)
+              4.1.3. amqp_consumer_event_key(str)
+              4.1.4. amqp_consumer_event_subkey(str)
+              4.1.5. amqp_max_channels(str)
+              4.1.6. amqp_connection(str)
+
+        4.2. execution control
+
+              4.2.1. amqp_consumer_loop_count(int)
+              4.2.2. amqp_internal_loop_count(int)
+              4.2.3. amqp_consumer_ack_loop_count(int)
+              4.2.4. consume_messages_on_reconnect(int)
+              4.2.5. single_consumer_on_reconnect(int)
+
+        4.3. timers
+
+              4.3.1. amqp_consumer_ack_timeout(str)
+              4.3.2. amqp_interprocess_timeout(str)
+              4.3.3. amqp_waitframe_timout(str)
+              4.3.4. amqp_query_timout(str)
+
+        4.4. presence related
+
+              4.4.1. db_url(str)
+              4.4.2. presentity_table(str)
+              4.4.3. dialog_expires(str)
+              4.4.4. presence_expires(str)
+              4.4.5. mwi_expires(str)
+
+   5. Functions
+
+        5.1. amqp related
+
+              5.1.1. kazoo_publish(exchange, routing_key, json_payload)
+              5.1.2. kazoo_query(exchange, routing_key, json_payload [,
+                      target_var])
+
+              5.1.3. kazoo_subscribe(exchange, exchange_type, queue,
+                      routing_key)
+
+              5.1.4. kazoo_subscribe(json_description)
+
+        5.2. presence related
+
+              5.2.1. kazoo_pua_publish(json_payload)
+
+        5.3. other
+
+              5.3.1. kazoo_encode(to_encode, target_var)
+              5.3.2. kazoo_json(json_payload, field, target_var)
+
+   6. Exported pseudo-variables
+   7. Transformations
+
+1. Overview
+
+   The Kazoo is a general purpose AMQP connector (tested with
+   rabbitmq-server). It exposes publish/consume capabilities into
+   Kamailio.
+
+   From a high-level, the purpose of the module might be for things like:
+     * Integrate to an AMQP application to make real-time routing
+       decisions (instead of using, say, a SQL database)
+     * Provide a real-time integration into your program, instead of your
+       database, so you can overlay additional logic in your preferred
+       language while also utilizing a message bus
+     * Utilize messaging to have a distributed messaging layer, such that
+       machines processing requests/responses/events can go up/down or
+       share the workload and your Kamailio node will still be happy
+
+   supported operations are:
+     * publish json payloads to rabbitmq
+     * publish json payloads to rabbitmq and wait for correlated response
+       message
+     * subscribe to an exchange with a routing key
+
+   The Kazoo module also has support to publish updates to presence module
+   thru the kazoo_pua_publish function
+
+2. How it works
+
+   2.1. event routes
+   2.2. aknowledge messages
+
+   The module works with a main forked process that does the communication
+   with rabbitmq for issuing publishes, waiting for replies and consuming
+   messages. When it consumes a message it defers the process to a worker
+   process so that it doesn't block this main process.
+
+2.1. event routes
+
+   The worker process issues an event-route where we can act on the
+   received payload. The name of the event-route is composed by values
+   extracted from the payload.
+
+   Kazoo module will try to execute the event route from most significant
+   to less significant. define the event route like
+   event_route[kazoo:consumer-event[-payload_key_value[-payload_subkey_val
+   ue]]]
+
+   we can set the key/subkey pair on a subscription base. check the
+   payload on subscribe.
+
+   Example 1.1. define the event route
+...
+modparam("kazoo", "amqp_consumer_event_key", "Event-Category")
+modparam("kazoo", "amqp_consumer_event_subkey", "Event-Name")
+...
+
+event_route[kazoo:consumer-event-presence-update]
+{
+# presence is the value extracted from Event-Category field in json payload
+# update is the value extracted from Event-Name field in json payload
+xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json
+,From})");
+...
+}
+
+event_route[kazoo:consumer-event-presence]
+{
+# presence is the value extracted from Event-Category field in json payload
+xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json
+,From})");
+...
+}
+
+event_route[kazoo:consumer-event-event-category-event-name]
+{
+# event-category is the name of the amqp_consumer_event_key parameter
+# event-name is the name of the amqp_consumer_event_subkey parameter
+# this event route is executed if we can't find the previous
+...
+}
+
+event_route[kazoo:consumer-event-event-category]
+{
+# event-category is the name of the amqp_consumer_event_key parameter
+# this event route is executed if we can't find the previous
+...
+}
+
+event_route[kazoo:consumer-event]
+{
+# this event route is executed if we can't find the previous
+}
+
+2.2. aknowledge messages
+
+   Consumed messages have the option of being acknowledge in two ways:
+     * immediately when received
+     * after processing by the worker
+
+3. Dependencies
+
+   3.1. Kamailio Modules
+   3.2. External Libraries or Applications
+
+3.1. Kamailio Modules
+
+   The following modules must be loaded before this module:
+     * none.
+
+3.2. External Libraries or Applications
+
+     * librabbitmq.
+     * libjson.
+     * libuuid.
+
+4. Parameters
+
+   4.1. amqp related
+
+        4.1.1. node_hostname(str)
+        4.1.2. amqp_consumer_processes(int)
+        4.1.3. amqp_consumer_event_key(str)
+        4.1.4. amqp_consumer_event_subkey(str)
+        4.1.5. amqp_max_channels(str)
+        4.1.6. amqp_connection(str)
+
+   4.2. execution control
+
+        4.2.1. amqp_consumer_loop_count(int)
+        4.2.2. amqp_internal_loop_count(int)
+        4.2.3. amqp_consumer_ack_loop_count(int)
+        4.2.4. consume_messages_on_reconnect(int)
+        4.2.5. single_consumer_on_reconnect(int)
+
+   4.3. timers
+
+        4.3.1. amqp_consumer_ack_timeout(str)
+        4.3.2. amqp_interprocess_timeout(str)
+        4.3.3. amqp_waitframe_timout(str)
+        4.3.4. amqp_query_timout(str)
+
+   4.4. presence related
+
+        4.4.1. db_url(str)
+        4.4.2. presentity_table(str)
+        4.4.3. dialog_expires(str)
+        4.4.4. presence_expires(str)
+        4.4.5. mwi_expires(str)
+
+4.1. amqp related
+
+4.1.1. node_hostname(str)
+
+   The name of this host to register in rabbitmq.
+
+   Default value is NULL. you must set this parameter value for the module
+   to work
+
+   Example 1.2. Set node_hostname parameter
+...
+modparam("kazoo", "node_hostname", "sipproxy.mydomain.com")
+...
+
+4.1.2. amqp_consumer_processes(int)
+
+   The number of worker processes to handle messages consumption.
+
+   Default value is 4.
+
+   Example 1.3. Set amqp_consumer_processes parameter
+...
+modparam("kazoo", "amqp_consumer_processes", 10)
+...
+
+4.1.3. amqp_consumer_event_key(str)
+
+   The default name of the field in json payload to compose the event name
+   1st part
+
+   Default value is “Event-Category”.
+
+   Example 1.4. Set amqp_consumer_event_key parameter
+...
+modparam("kazoo", "amqp_consumer_event_key", "My-JSON-Field-Name")
+...
+
+4.1.4. amqp_consumer_event_subkey(str)
+
+   The default name of the field in json payload to compose the event name
+   2nd part
+
+   Default value is “Event-Name”.
+
+   Example 1.5. Set amqp_consumer_event_subkey parameter
+...
+modparam("kazoo", "amqp_consumer_event_subkey", "My-JSON-SubField-Name")
+...
+
+4.1.5. amqp_max_channels(str)
+
+   The number of pre allocated channels for the connection.
+
+   Default value is 50.
+
+   Example 1.6. Set amqp_max_channels parameter
+...
+modparam("kazoo", "amqp_max_channels", 100)
+...
+
+4.1.6. amqp_connection(str)
+
+   The connection url to rabbitmq. can be set multiple times for failover.
+
+   Example 1.7. Set amqp_connection parameter
+...
+modparam("kazoo", "amqp_connection", "amqp://guest:guest@localhost:5672")
+modparam("kazoo", "amqp_connection", "kazoo://guest:guest@otherhost:5672")
+...
+
+4.2. execution control
+
+   execution control of main loop can be controlled by changing the
+   parameter values in this section.
+
+   The main loop has 3 sub-loops were it listen for actions to execute
+   with a timeout. These group of parameters allow to set the maximum
+   number of times the sub-loop is executed if it doesn't timeout.
+
+   On busy systems, we may have a condition where a sub-loop never times
+   out because it always has data to process. The purpose of these
+   parameters is to set a maximum number of times it executes before it
+   handles control to the next sub-loop.
+...
+while(true) // main  loop
+ while(ACK or timeout)  // acknowledge from worker process
+ while(SEND or timeout) // anything to send ?
+ while(CONSUME or timeout) // any data on consumed exchanges ?
+...
+
+4.2.1. amqp_consumer_loop_count(int)
+
+   The consumer loop count.
+
+   Default value is 10.
+
+   Example 1.8. Set amqp_consumer_loop_count parameter
+...
+modparam("kazoo", "amqp_consumer_loop_count", 3)
+...
+
+4.2.2. amqp_internal_loop_count(int)
+
+   The internal listen for commands loop count.
+
+   Default value is 5.
+
+   Example 1.9. Set amqp_internal_loop_count parameter
+...
+modparam("kazoo", "amqp_internal_loop_count", 1)
+...
+
+4.2.3. amqp_consumer_ack_loop_count(int)
+
+   The work ack loop count.
+
+   Default value is 20.
+
+   Example 1.10. Set amqp_consumer_ack_loop_count parameter
+...
+modparam("kazoo", "amqp_consumer_ack_loop_count", 5)
+...
+
+4.2.4. consume_messages_on_reconnect(int)
+
+   This parameter indicates if the module ignores the loop counters on
+   reconnect and consumes all the pending messages ready to be consumed.
+
+   Default value is 1.
+
+   Example 1.11. Set consume_messages_on_reconnect parameter
+...
+modparam("kazoo", "consume_messages_on_reconnect", 0)
+...
+
+4.2.5. single_consumer_on_reconnect(int)
+
+   When the main loop receives a message from rabbitmq it will defer the
+   execution to a worker in a round-robin manner. this parameter allows to
+   use the same worker when kazoo reconnects to rabbitmq.
+
+   Default value is 1.
+
+   Example 1.12. Set single_consumer_on_reconnect parameter
+...
+modparam("kazoo", "single_consumer_on_reconnect", 0)
+...
+
+4.3. timers
+
+   each functional parameter related to timers come with 2 reflected
+   parameters. name_sec and name_micro
+
+4.3.1. amqp_consumer_ack_timeout(str)
+
+   Timeout when checking for aknowledge from workers.
+
+   Default value is 100000 micro.
+
+   Example 1.13. Set amqp_consumer_ack_timeout parameter
+...
+modparam("kazoo", "amqp_consumer_ack_timeout_sec", 1)
+modparam("kazoo", "amqp_consumer_ack_timeout_micro", 200000)
+...
+
+4.3.2. amqp_interprocess_timeout(str)
+
+   Timeout when checking for commands (publish/query) for sending to
+   rabbitmq.
+
+   Default value is 100000 micro.
+
+   Example 1.14. Set amqp_interprocess_timeout parameter
+...
+modparam("kazoo", "amqp_interprocess_timeout_sec", 1)
+modparam("kazoo", "amqp_interprocess_timeout_micro", 200000)
+...
+
+4.3.3. amqp_waitframe_timout(str)
+
+   Timeout when checking for messages from rabbitmq.
+
+   Default value is 100000 micro.
+
+   Example 1.15. Set amqp_waitframe_timout parameter
+...
+modparam("kazoo", "amqp_waitframe_timout_sec", 1)
+modparam("kazoo", "amqp_waitframe_timout_micro", 200000)
+...
+
+4.3.4. amqp_query_timout(str)
+
+   Timeout when checking for reply messages from rabbitmq for kazoo_query
+   commands.
+
+   Default value is 2 sec.
+
+   Example 1.16. Set amqp_query_timout parameter
+...
+modparam("kazoo", "amqp_query_timout_sec", 1)
+modparam("kazoo", "amqp_query_timout_micro", 200000)
+...
+
+4.4. presence related
+
+4.4.1. db_url(str)
+
+   The database url.
+
+   If set, the module is a fully operational presence server. Otherwise,
+   it is used as a 'library', for its exported functions.
+
+   Default value is “NULL”.
+
+   Example 1.17. Set db_url parameter
+...
+modparam("kazoo", "db_url", "mysql://kamailio:kamailiorw@localhost/kamailio")
+...
+
+4.4.2. presentity_table(str)
+
+   The name of the presentity table in the database.
+
+   Default value is “presentity”.
+
+   Example 1.18. Set presentity_table parameter
+...
+modparam("kazoo", "presentity_table", "my_presentity_table")
+...
+
+4.4.3. dialog_expires(str)
+
+   The default Expires value for dialog event.
+
+   Default value is 30.
+
+   Example 1.19. Set dialog_expires parameter
+...
+modparam("kazoo", "dialog_expires", 3600)
+...
+
+4.4.4. presence_expires(str)
+
+   The default Expires value for presence event.
+
+   Default value is 3600.
+
+   Example 1.20. Set presence_expires parameter
+...
+modparam("kazoo", "presence_expires", 600)
+...
+
+4.4.5. mwi_expires(str)
+
+   The default Expires value for message-summary (mwi) event.
+
+   Default value is 3600.
+
+   Example 1.21. Set mwi_expires parameter
+...
+modparam("kazoo", "mwi_expires", 600)
+...
+
+5. Functions
+
+   5.1. amqp related
+
+        5.1.1. kazoo_publish(exchange, routing_key, json_payload)
+        5.1.2. kazoo_query(exchange, routing_key, json_payload [,
+                target_var])
+
+        5.1.3. kazoo_subscribe(exchange, exchange_type, queue,
+                routing_key)
+
+        5.1.4. kazoo_subscribe(json_description)
+
+   5.2. presence related
+
+        5.2.1. kazoo_pua_publish(json_payload)
+
+   5.3. other
+
+        5.3.1. kazoo_encode(to_encode, target_var)
+        5.3.2. kazoo_json(json_payload, field, target_var)
+
+5.1. amqp related
+
+5.1.1.  kazoo_publish(exchange, routing_key, json_payload)
+
+   The function publishes a json payload to rabbitmq. The routing_key
+   parameter should be encoded.
+
+   This function can be used from ANY ROUTE.
+
+   Example 1.22. kazoo_publish usage
+...
+$var(amqp_payload_request) = "{'Event-Category' : 'directory', 'Event-Name' : '
+reg_success', 'Contact' : '" + $var(fs_contact) + "', 'Call-ID' : '" + $ci + "'
+, 'Realm' : '" + $fd +"', 'Username' : '" + $fU + "', 'From-User' : '" + $fU +
+"', 'From-Host' : '" + $fd + "', 'To-User' : '" + $tU +"', 'To-Host' : '" + $td
+ + "', 'User-Agent' : '" + $ua +"' ," + $var(register_contants)+ " }";
+$var(amqp_routing_key) = "registration.success." + $(fd{kz.encode}) + "." + $fU
+;
+kazoo_publish("callmgr", $var(amqp_routing_key), $var(amqp_payload_request));
+...
+
+5.1.2.  kazoo_query(exchange, routing_key, json_payload [, target_var])
+
+   The function publishes a json payload to rabbitmq, waits for a
+   correlated messageand puts the result in target_var. The routing_key
+   parameter should be encoded. target_var is optional as the function
+   also puts the result in pseudo-variable $kzR.
+
+   This function can be used from ANY ROUTE.
+
+   Example 1.23. kazoo_query usage
+...
+$var(amqp_payload_request) = "{'Event-Category' : 'call_event' , 'Event-Name' :
+ 'query_user_channels_req', 'Realm' : '" + $fd + "', 'Username' : '" + $fU + "'
+, 'Active-Only' : false }";
+kazoo_encode("$ci", "$var(callid_encoded)");
+$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
+if(kazoo_query("callevt", $var(amqp_routing_key), $var(amqp_payload_request), "
+$var(amqp_result)")) {
+   kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
+   if($du != $null) {
+       xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
+       return;
+   }
+}
+...
+
+5.1.3.  kazoo_subscribe(exchange, exchange_type, queue, routing_key)
+
+   The function subscribes to exchange/type on queue with routing_key. The
+   routing_key parameter should be encoded.
+
+   This function must be called from event_route[kazoo:mod-init].
+
+   Example 1.24. kazoo_subscribe usage
+...
+event_route[kazoo:mod-init]
+{
+   kazoo_subscribe("dialoginfo", "direct", "BLF-QUEUE-MY_HOSTNAME", "BLF-MY_HOS
+TNAME");
+}
+
+event_route[kazoo:consumer-event]
+{
+    xlog("L_INFO","Received json payload : $kzE");
+}
+...
+
+5.1.4.  kazoo_subscribe(json_description)
+
+   The function takes additional parameters to the subscribe function.
+
+   json payload fields description
+     * exchange : str, required
+     * type : str, required
+     * queue : str, required
+     * routing : str, required
+     * auto_delete : int, default 1
+     * durable : int, default 0
+     * no_ack : int, default 1
+     * wait_for_consumer_ack : int, default 0
+     * event_key : str, no default
+     * event_subkey : str, no default
+
+   This function must be called from event_route[kazoo:mod-init].
+
+   Example 1.25. kazoo_subscribe usage
+...
+event_route[kazoo:mod-init]
+{
+    $var(payload) = "{ 'exchange' : 'dialoginfo' , 'type' : 'direct', 'queue' :
+ 'BLF-QUEUE-MY_HOSTNAME', 'routing' : 'BLF-MY_HOSTNAME', 'auto_delete' : 0, 'du
+rable' : 1, 'no_ack' : 0, 'wait_for_consumer_ack' : 1 }";
+    kazoo_subscribe("$var(payload)");
+}
+
+event_route[kazoo:consumer-event]
+{
+    xlog("L_INFO","Received json payload : $kzE");
+}
+...
+
+5.2. presence related
+
+5.2.1.  kazoo_pua_publish(json_payload)
+
+   The function build presentity state from json_payload and updates
+   presentity table.
+
+   This function can be used from ANY ROUTE.
+
+   Example 1.26. kazoo_pua_publish usage
+...
+event_route[kazoo:consumer-event-presence-update]
+{
+    xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.
+json,From})");
+    kazoo_pua_publish($kzE);
+    pres_refresh_watchers("$(kzE{kz.json,From})", "$(kzE{kz.json,Event-Package}
+)", 1);
+}
+...
+
+5.3. other
+
+5.3.1.  kazoo_encode(to_encode, target_var)
+
+   The function encodes the 1st parameter for amqp and puts the result in
+   the 2nd parameter.
+
+   This function can be used from ANY ROUTE.
+
+   Example 1.27. kazoo_encode usage
+...
+kazoo_encode("$ci", "$var(callid_encoded)");
+$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
+...
+
+5.3.2.  kazoo_json(json_payload, field, target_var)
+
+   The function extracts the value from a json payload and puts the result
+   in the 3rd parameter. It can use nested values for the query part.
+
+   This function can be used from ANY ROUTE.
+
+   Example 1.28. kazoo_json usage
+...
+kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
+if($du != $null) {
+  xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
+  return;
+}
+...
+
+6. Exported pseudo-variables
+
+     * $kzR Contains the payload result of kazoo_query execution.
+     * $kzE Contains the payload of a consumed message
+
+7. Transformations
+
+   The prefix for kazoo transformations is kz.
+     * json
+       Example 1.29. kz.json usage
+...
+#kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
+$du = $kzR{kz.json,Channels[0].switch_url};
+if($du != $null) {
+  xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
+  return;
+}
+...
+     * encode
+       Example 1.30. kz.encode usage
+...
+#kazoo_encode("$ci", "$var(callid_encoded)");
+#$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
+$var(amqp_routing_key) = "call.status_req." + $(ci{kz.encode})
+...

+ 39 - 39
modules/kazoo/defs.h

@@ -48,53 +48,53 @@
 #define PRESENCE_BODY_BUFFER_SIZE 4096
 
 #define MWI_BODY             "Messages-Waiting: %.*s\r\nMessage-Account: %.*s\r\nVoice-Message: %.*s/%.*s (%.*s/%.*s)\r\n"
-#define PRESENCE_BODY        "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \
+#define PRESENCE_BODY        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
 <presence xmlns=\"urn:ietf:params:xml:ns:pidf\" xmlns:dm=\"urn:ietf:params:xml:ns:pidf:data-model\" xmlns:rpid=\"urn:ietf:params:xml:ns:pidf:rpid\" xmlns:c=\"urn:ietf:params:xml:ns:pidf:cipid\" entity=\"%s\"> \
-    <tuple xmlns=\"urn:ietf:params:xml:ns:pidf\" id=\"%s\"> \
-        <status> \
-            <basic>%s</basic> \
-        </status> \
-    </tuple> \
-    <note xmlns=\"urn:ietf:params:xml:ns:pidf\">%s</note> \
-    <dm:person xmlns:dm=\"urn:ietf:params:xml:ns:pidf:data-model\" xmlns:rpid=\"urn:ietf:params:xml:ns:pidf:rpid\" id=\"1\"> \
-        <rpid:activities>%s</rpid:activities> \
-        <dm:note>%s</dm:note> \
-    </dm:person> \
+<tuple xmlns=\"urn:ietf:params:xml:ns:pidf\" id=\"%s\">\
+<status>\
+<basic>%s</basic>\
+</status>\
+</tuple>\
+<note xmlns=\"urn:ietf:params:xml:ns:pidf\">%s</note>\
+<dm:person xmlns:dm=\"urn:ietf:params:xml:ns:pidf:data-model\" xmlns:rpid=\"urn:ietf:params:xml:ns:pidf:rpid\" id=\"1\">\
+<rpid:activities>%s</rpid:activities>\
+<dm:note>%s</dm:note>\
+</dm:person>\
 </presence>"
 
-#define DIALOGINFO_EMPTY_BODY "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \
+#define DIALOGINFO_EMPTY_BODY "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
 <dialog-info xmlns=\"urn:ietf:params:xml:ns:dialog-info\" version=\"1\" state=\"full\" entity=\"%.*s\"> \
-   <dialog direction=\"initiator\"> \
-      <state>terminated</state> \
-   </dialog> \
+<dialog direction=\"initiator\">\
+<state>terminated</state>\
+</dialog>\
 </dialog-info>"
 
-#define DIALOGINFO_BODY "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \
-<dialog-info xmlns=\"urn:ietf:params:xml:ns:dialog-info\" version=\"1\" state=\"full\" entity=\"%.*s\"> \
-   <dialog id=\"%.*s\" call-id=\"%.*s\" local-tag=\"%.*s\" remote-tag=\"%.*s\" direction=\"%.*s\"> \
-      <state>%.*s</state> \
-      <local> \
-         <identity>%.*s</identity> \
-         <target uri=\"sip:%.*s\"/> \
-      </local> \
-      <remote> \
-         <identity>%.*s</identity> \
-         <target uri=\"sip:%.*s\"/> \
-      </remote> \
-   </dialog> \
+#define DIALOGINFO_BODY "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<dialog-info xmlns=\"urn:ietf:params:xml:ns:dialog-info\" version=\"1\" state=\"full\" entity=\"%.*s\">\
+<dialog id=\"%.*s\" call-id=\"%.*s\" local-tag=\"%.*s\" remote-tag=\"%.*s\" direction=\"%.*s\">\
+<state>%.*s</state>\
+<local>\
+<identity>%.*s</identity>\
+<target uri=\"sip:%.*s\"/>\
+</local>\
+<remote>\
+<identity>%.*s</identity>\
+<target uri=\"sip:%.*s\"/>\
+</remote>\
+</dialog>\
 </dialog-info>"
 
-#define DIALOGINFO_BODY_2 "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \
-<dialog-info xmlns=\"urn:ietf:params:xml:ns:dialog-info\" version=\"1\" state=\"full\" entity=\"%.*s\"> \
-   <dialog id=\"%.*s\" call-id=\"%.*s\" local-tag=\"%.*s\" remote-tag=\"%.*s\" direction=\"%.*s\"> \
-      <state>%.*s</state> \
-      <local> \
-         <identity>%.*s</identity> \
-      </local> \
-      <remote> \
-         <identity>%.*s</identity> \
-      </remote> \
-   </dialog> \
+#define DIALOGINFO_BODY_2 "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<dialog-info xmlns=\"urn:ietf:params:xml:ns:dialog-info\" version=\"1\" state=\"full\" entity=\"%.*s\">\
+<dialog id=\"%.*s\" call-id=\"%.*s\" local-tag=\"%.*s\" remote-tag=\"%.*s\" direction=\"%.*s\">\
+<state>%.*s</state>\
+<local>\
+<identity>%.*s</identity>\
+</local>\
+<remote>\
+<identity>%.*s</identity>\
+</remote>\
+</dialog>\
 </dialog-info>"
 
 #define json_extract_field(json_name, field)  do {                      \

+ 4 - 5
modules/kazoo/doc/kazoo.xml

@@ -14,9 +14,8 @@
 	<productname class="trade">&kamailioname;</productname>
 	<authorgroup>
 	    <author>
-		<firstname>Luis</firstname>
-		<surname>Azedo</surname>
-		<email>[email protected]</email>
+		<firstname>2600hz Inc.</firstname>
+		<email>[email protected]</email>
 	    </author>
 	    <editor>
 		<firstname>Luis</firstname>
@@ -27,12 +26,12 @@
 	<copyright>
 	    <year>2010</year>
 	    <year>2014</year>
-	    <holder>&fhg;</holder>
+	    <holder>2600hz</holder>
 	</copyright>
     </bookinfo>
     <toc></toc>
     
-    <xi:include href="db_text_admin.xml"/>
+    <xi:include href="kazoo_admin.xml"/>
     <!-- <xi:include href="db_text_devel.xml"/> -->
     
 

+ 821 - 6
modules/kazoo/doc/kazoo_admin.xml

@@ -12,11 +12,826 @@
 <chapter xmlns:xi="http://www.w3.org/2001/XInclude">
 	<title>&adminguide;</title>
 
-	<section>
-	<title>Overview</title>
-	<para>
-		Documentation to come in the next days.
-	</para>
-	</section>
+    
+    <section>
+    <title>Overview</title>
+    <para> The Kazoo is a general purpose AMQP connector (tested with rabbitmq-server). It exposes publish/consume capabilities into Kamailio.
+    </para>
+<para>
+From a high-level, the purpose of the module might be for things like:
+<itemizedlist>
+<listitem>
+<para>
+Integrate to an AMQP application to make real-time routing decisions (instead of using, say, a SQL database)
+</para>
+</listitem>
+<listitem>
+<para>
+Provide a real-time integration into your program, instead of your database, so you can overlay additional logic in your preferred language while also utilizing a message bus
+</para>
+</listitem>
+<listitem>
+<para>
+Utilize messaging to have a distributed messaging layer, such that machines processing requests/responses/events can go up/down or share the workload and your Kamailio node will still be happy
+</para>
+</listitem>
+</itemizedlist>
+</para>
+
+
+<para>
+supported operations are:
+<itemizedlist>
+<listitem>
+<para>
+publish json payloads to rabbitmq
+</para>
+</listitem>
+<listitem>
+<para>
+publish json payloads to rabbitmq and wait for correlated response message
+</para>
+</listitem>
+<listitem>
+<para>
+subscribe to an exchange with a routing key
+</para>
+</listitem>
+</itemizedlist>
+</para>
+<para>
+The Kazoo module also has support to publish updates to presence module thru the kazoo_pua_publish function
+</para>
+
+</section>
+    <section>
+    <title>How it works</title>
+<para>  
+The module works with a main forked process that does the communication with rabbitmq for issuing publishes, waiting for replies and consuming messages. When it consumes a message it defers the process to a worker process so that it doesn't block this main process.
+</para>
+    <section>
+    <title>event routes</title>
+    <para>
+The worker process issues an event-route where we can act on the received payload. The name of the event-route is composed by values extracted from the payload.
+    </para>
+    <para>
+    Kazoo module will try to execute the event route from most significant to less significant. 
+    define the event route like event_route[kazoo:consumer-event[-payload_key_value[-payload_subkey_value]]]
+    </para>
+    <para>
+    we can set the key/subkey pair on a subscription base. check the payload on subscribe.
+    </para>
+        <example>
+        <title>define the event route</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "amqp_consumer_event_key", "Event-Category")
+modparam("kazoo", "amqp_consumer_event_subkey", "Event-Name")
+...
+
+event_route[kazoo:consumer-event-presence-update]
+{
+# presence is the value extracted from Event-Category field in json payload 
+# update is the value extracted from Event-Name field in json payload 
+xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})");
+...
+}
+
+event_route[kazoo:consumer-event-presence]
+{
+# presence is the value extracted from Event-Category field in json payload 
+xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})");
+...
+}
+
+event_route[kazoo:consumer-event-event-category-event-name]
+{
+# event-category is the name of the amqp_consumer_event_key parameter
+# event-name is the name of the amqp_consumer_event_subkey parameter
+# this event route is executed if we can't find the previous 
+...
+}
+
+event_route[kazoo:consumer-event-event-category]
+{
+# event-category is the name of the amqp_consumer_event_key parameter
+# this event route is executed if we can't find the previous 
+...
+}
+
+event_route[kazoo:consumer-event]
+{
+# this event route is executed if we can't find the previous 
+}
+
+</programlisting>
+        </example>
+</section>
+    <section>
+    <title>aknowledge messages</title>
+<para>
+Consumed messages have the option of being acknowledge in two ways:
+<itemizedlist>
+<listitem>
+<para>
+immediately when received 
+</para>
+</listitem>
+<listitem>
+<para>
+after processing by the worker 
+</para>
+</listitem>
+</itemizedlist>
+
+    </para>
+    
+</section>
+    </section>
+
+    <section>
+    <title>Dependencies</title>
+    <section>
+        <title>&kamailio; Modules</title>
+        <para>
+        The following modules must be loaded before this module:
+            <itemizedlist>
+            <listitem>
+            <para>
+                <emphasis>none</emphasis>.
+            </para>
+            </listitem>
+            </itemizedlist>
+        </para>
+    </section>
+    <section>
+        <title>External Libraries or Applications</title>
+        <itemizedlist>
+            <listitem>
+            <para>
+                <emphasis>librabbitmq</emphasis>.
+            </para>
+            </listitem>
+            <listitem>
+            <para>
+                <emphasis>libjson</emphasis>.
+            </para>
+            </listitem>
+            <listitem>
+            <para>
+                <emphasis>libuuid</emphasis>.
+            </para>
+            </listitem>
+        </itemizedlist>
+
+        </section>
+    </section>
+    
+    
+    <section>
+    <title>Parameters</title>
+    <section>
+    <title>amqp related</title>
+    <section>
+        <title><varname>node_hostname</varname>(str)</title>
+        <para>
+        The name of this host to register in rabbitmq.
+        </para>
+        <para>
+        <emphasis>Default value is NULL. you must set this parameter value for the module to work</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>node_hostname</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "node_hostname", "sipproxy.mydomain.com")
+...
+</programlisting>
+        </example>
+    </section>
+    <section>
+        <title><varname>amqp_consumer_processes</varname>(int)</title>
+        <para>
+        The number of worker processes to handle messages consumption.
+        </para>
+        <para>
+        <emphasis>Default value is 4.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>amqp_consumer_processes</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "amqp_consumer_processes", 10)
+...
+</programlisting>
+        </example>
+    </section>
+
+    <section>
+        <title><varname>amqp_consumer_event_key</varname>(str)</title>
+        <para>
+        The default name of the field in json payload to compose the event name 1st part
+        </para>
+        <para>
+        <emphasis>Default value is <quote>Event-Category</quote>.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>amqp_consumer_event_key</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "amqp_consumer_event_key", "My-JSON-Field-Name")
+...
+</programlisting>
+        </example>
+    </section>
+
+    <section>
+        <title><varname>amqp_consumer_event_subkey</varname>(str)</title>
+        <para>
+        The default name of the field in json payload to compose the event name 2nd part
+        </para>
+        <para>
+        <emphasis>Default value is <quote>Event-Name</quote>.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>amqp_consumer_event_subkey</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "amqp_consumer_event_subkey", "My-JSON-SubField-Name")
+...
+</programlisting>
+        </example>
+    </section>
+
+    <section>
+        <title><varname>amqp_max_channels</varname>(str)</title>
+        <para>
+        The number of pre allocated channels for the connection.
+        </para>
+        <para>
+        <emphasis>Default value is 50.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>amqp_max_channels</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "amqp_max_channels", 100)
+...
+</programlisting>
+        </example>
+    </section>
+
+    <section>
+        <title><varname>amqp_connection</varname>(str)</title>
+        <para>
+        The connection url to rabbitmq. can be set multiple times for failover.
+        </para>
+        <example>
+        <title>Set <varname>amqp_connection</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "amqp_connection", "amqp://guest:guest@localhost:5672")
+modparam("kazoo", "amqp_connection", "kazoo://guest:guest@otherhost:5672")
+...
+</programlisting>
+        </example>
+    </section>
+
+    
+
+    
+
+
+    </section>
+
+    <section>
+    <title>execution control</title>
+    <para>execution control of main loop can be controlled by changing the parameter values in this section.</para>
+    <para>The main loop has 3 sub-loops were it listen for actions to execute with a timeout. These group of parameters allow to set the maximum number of times the sub-loop is executed if it doesn't timeout.</para>
+    <para>On busy systems, we may have a condition where a sub-loop never times out because it always has data to process. The purpose of these parameters is to set a maximum number of times it executes before it handles control to the next sub-loop.</para>
+        <programlisting format="linespecific">
+...
+while(true) // main  loop
+ while(ACK or timeout)  // acknowledge from worker process
+ while(SEND or timeout) // anything to send ?
+ while(CONSUME or timeout) // any data on consumed exchanges ?
+...
+</programlisting>
+
+    <section>
+        <title><varname>amqp_consumer_loop_count</varname>(int)</title>
+        <para>
+        The consumer loop count.
+        </para>
+        <para>
+        <emphasis>Default value is 10.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>amqp_consumer_loop_count</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "amqp_consumer_loop_count", 3)
+...
+</programlisting>
+        </example>
+    </section>    
+    
+    <section>
+        <title><varname>amqp_internal_loop_count</varname>(int)</title>
+        <para>
+        The internal listen for commands loop count.
+        </para>
+        <para>
+        <emphasis>Default value is 5.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>amqp_internal_loop_count</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "amqp_internal_loop_count", 1)
+...
+</programlisting>
+        </example>
+    </section>    
+    
+    <section>
+        <title><varname>amqp_consumer_ack_loop_count</varname>(int)</title>
+        <para>
+        The work ack loop count.
+        </para>
+        <para>
+        <emphasis>Default value is 20.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>amqp_consumer_ack_loop_count</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "amqp_consumer_ack_loop_count", 5)
+...
+</programlisting>
+        </example>
+    </section><section>
+        <title><varname>consume_messages_on_reconnect</varname>(int)</title>
+        <para>
+        This parameter indicates if the module ignores the loop counters on reconnect and consumes all the pending messages ready to be consumed.
+        </para>
+        <para>
+        <emphasis>Default value is 1.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>consume_messages_on_reconnect</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "consume_messages_on_reconnect", 0)
+...
+</programlisting>
+        </example>
+    </section><section>
+        <title><varname>single_consumer_on_reconnect</varname>(int)</title>
+        <para>
+        When the main loop receives a message from rabbitmq it will defer the execution to a worker in a round-robin manner. this parameter allows to use the same worker when kazoo reconnects to rabbitmq.
+        </para>
+        <para>
+        <emphasis>Default value is 1.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>single_consumer_on_reconnect</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "single_consumer_on_reconnect", 0)
+...
+</programlisting>
+        </example>
+    </section>    
+    
+    
+    </section>
+
+    <section>
+    <title>timers</title>
+    <para>
+    each functional parameter related to timers come with 2 reflected parameters. name_sec and name_micro 
+    </para>
+    <section>
+        <title><varname>amqp_consumer_ack_timeout</varname>(str)</title>
+        <para>
+        Timeout when checking for aknowledge from workers.
+        </para>
+        <para>
+        <emphasis>Default value is 100000 micro.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>amqp_consumer_ack_timeout</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "amqp_consumer_ack_timeout_sec", 1)
+modparam("kazoo", "amqp_consumer_ack_timeout_micro", 200000)
+...
+</programlisting>
+        </example>
+    </section>    
+
+    <section>
+        <title><varname>amqp_interprocess_timeout</varname>(str)</title>
+        <para>
+        Timeout when checking for commands (publish/query) for sending to rabbitmq.
+        </para>
+        <para>
+        <emphasis>Default value is 100000 micro.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>amqp_interprocess_timeout</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "amqp_interprocess_timeout_sec", 1)
+modparam("kazoo", "amqp_interprocess_timeout_micro", 200000)
+...
+</programlisting>
+        </example>
+    </section>    
+
+    <section>
+        <title><varname>amqp_waitframe_timout</varname>(str)</title>
+        <para>
+        Timeout when checking for messages from rabbitmq.
+        </para>
+        <para>
+        <emphasis>Default value is 100000 micro.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>amqp_waitframe_timout</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "amqp_waitframe_timout_sec", 1)
+modparam("kazoo", "amqp_waitframe_timout_micro", 200000)
+...
+</programlisting>
+        </example>
+    </section>    
+
+    <section>
+        <title><varname>amqp_query_timout</varname>(str)</title>
+        <para>
+        Timeout when checking for reply messages from rabbitmq for kazoo_query commands.
+        </para>
+        <para>
+        <emphasis>Default value is 2 sec.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>amqp_query_timout</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "amqp_query_timout_sec", 1)
+modparam("kazoo", "amqp_query_timout_micro", 200000)
+...
+</programlisting>
+        </example>
+    </section>    
+ 
+    </section>
+        
+    <section>
+    <title>presence related</title>
+    <section>
+        <title><varname>db_url</varname>(str)</title>
+        <para>
+        The database for the presentity table.
+        </para>
+        <para>If set, the kazoo_ppua_publish function will update the presentity status in the database.
+        </para>
+        <para>
+        <emphasis>Default value is <quote>NULL</quote>.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>db_url</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "db_url", "&defaultdb;")
+...
+</programlisting>
+        </example>
+    </section>
+
+    <section>
+        <title><varname>presentity_table</varname>(str)</title>
+        <para>
+        The name of the presentity table in the database.
+        </para>
+        <para>
+        <emphasis>Default value is <quote>presentity</quote>.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>presentity_table</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("kazoo", "presentity_table", "my_presentity_table")
+...
+</programlisting>
+        </example>
+    </section>
+
+    
+    </section>
+    
+    
+    
+
+
+</section>    
+<section>
+    <title>Functions</title>
+    <section>
+    <title>amqp related</title>
+
+    <section>
+        <title>
+        <function moreinfo="none">kazoo_publish(exchange, routing_key, json_payload)</function>
+        </title>
+        <para>
+        The function publishes a json payload to rabbitmq. The routing_key parameter should be encoded.
+        </para>
+        <para>
+        This function can be used from ANY ROUTE.
+        </para>
+
+        <example>
+        <title><function>kazoo_publish</function> usage</title>
+        <programlisting format="linespecific">
+...
+$var(amqp_payload_request) = "{'Event-Category' : 'directory', 'Event-Name' : 'reg_success', 'Contact' : '" + $var(fs_contact) + "', 'Call-ID' : '" + $ci + "', 'Realm' : '" + $fd +"', 'Username' : '" + $fU + "', 'From-User' : '" + $fU + "', 'From-Host' : '" + $fd + "', 'To-User' : '" + $tU +"', 'To-Host' : '" + $td + "', 'User-Agent' : '" + $ua +"' ," + $var(register_contants)+ " }";
+$var(amqp_routing_key) = "registration.success." + $(fd{kz.encode}) + "." + $fU;
+kazoo_publish("callmgr", $var(amqp_routing_key), $var(amqp_payload_request)); 
+...
+</programlisting>
+        </example>
+    </section>
+    
+    <section>
+        <title>
+        <function moreinfo="none">kazoo_query(exchange, routing_key, json_payload [, target_var])</function>
+        </title>
+        <para>
+        The function publishes a json payload to rabbitmq, waits for a correlated messageand puts the result in target_var. The routing_key parameter should be encoded.
+        target_var is optional as the function also puts the result in pseudo-variable $kzR.
+        </para>
+        <para>
+        This function can be used from ANY ROUTE.
+        </para>
+
+        <example>
+        <title><function>kazoo_query</function> usage</title>
+        <programlisting format="linespecific">
+...
+$var(amqp_payload_request) = "{'Event-Category' : 'call_event' , 'Event-Name' : 'query_user_channels_req', 'Realm' : '" + $fd + "', 'Username' : '" + $fU + "', 'Active-Only' : false }";
+kazoo_encode("$ci", "$var(callid_encoded)");
+$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
+if(kazoo_query("callevt", $var(amqp_routing_key), $var(amqp_payload_request), "$var(amqp_result)")) {
+   kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
+   if($du != $null) {
+       xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
+       return;
+   }
+}
+...
+</programlisting>
+        </example>
+    </section>
+    
+    <section>
+        <title>
+        <function moreinfo="none">kazoo_subscribe(exchange, exchange_type, queue, routing_key)</function>
+        </title>
+        <para>
+        The function subscribes to exchange/type on queue with routing_key. The routing_key parameter should be encoded.
+        </para>
+        <para>
+        This function  must be called from event_route[kazoo:mod-init].
+        </para>
+
+        <example>
+        <title><function>kazoo_subscribe</function> usage</title>
+        <programlisting format="linespecific">
+...
+event_route[kazoo:mod-init]
+{
+   kazoo_subscribe("dialoginfo", "direct", "BLF-QUEUE-MY_HOSTNAME", "BLF-MY_HOSTNAME");
+}
+
+event_route[kazoo:consumer-event]
+{
+    xlog("L_INFO","Received json payload : $kzE");
+}
+...
+</programlisting>
+        </example>
+    </section>
+
+    <section>
+        <title>
+        <function moreinfo="none">kazoo_subscribe(json_description)</function>
+        </title>
+        <para>
+        The function takes additional parameters to the subscribe function.
+        </para>
+        <para>        
+        <itemizedlist>
+            <title>json payload fields description</title>
+            <listitem>
+               <para>exchange : str, required</para> 
+            </listitem>
+            <listitem>
+               <para>type : str, required</para> 
+            </listitem>
+            <listitem>
+               <para>queue : str, required</para> 
+            </listitem>
+            <listitem>
+               <para>routing : str, required</para> 
+            </listitem>
+            <listitem>
+               <para>auto_delete : int, default 1</para> 
+            </listitem>
+            <listitem>
+               <para>durable : int, default 0</para> 
+            </listitem>
+            <listitem>
+               <para>no_ack : int, default 1</para> 
+            </listitem>
+            <listitem>
+               <para>wait_for_consumer_ack : int, default 0</para> 
+            </listitem>
+            <listitem>
+               <para>event_key : str, no default</para> 
+            </listitem>
+            <listitem>
+               <para>event_subkey : str, no default</para> 
+            </listitem>
+        </itemizedlist>
+        </para>
+        <para>
+        This function  must be called from event_route[kazoo:mod-init].
+        </para>
+        <example>
+        <title><function>kazoo_subscribe</function> usage</title>
+        <programlisting format="linespecific">
+...
+event_route[kazoo:mod-init]
+{
+    $var(payload) = "{ 'exchange' : 'dialoginfo' , 'type' : 'direct', 'queue' : 'BLF-QUEUE-MY_HOSTNAME', 'routing' : 'BLF-MY_HOSTNAME', 'auto_delete' : 0, 'durable' : 1, 'no_ack' : 0, 'wait_for_consumer_ack' : 1 }";
+    kazoo_subscribe("$var(payload)");
+}
+
+event_route[kazoo:consumer-event]
+{
+    xlog("L_INFO","Received json payload : $kzE");
+}
+...
+</programlisting>
+        </example>
+    </section>
+    
+</section>
+    <section>
+    <title>presence related</title>
+    <section>
+        <title>
+        <function moreinfo="none">kazoo_pua_publish(json_payload)</function>
+        </title>
+        <para>
+        The function build presentity state from json_payload and updates presentity table.
+        </para>
+        <para>
+        This function can be used from ANY ROUTE.
+        </para>
+
+        <example>
+        <title><function>kazoo_pua_publish</function> usage</title>
+        <programlisting format="linespecific">
+...
+event_route[kazoo:consumer-event-presence-update]
+{
+    xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})");
+    kazoo_pua_publish($kzE);
+    pres_refresh_watchers("$(kzE{kz.json,From})", "$(kzE{kz.json,Event-Package})", 1);
+}    
+...
+</programlisting>
+        </example>
+    </section>
+
+    
+</section>
+    <section>
+    <title>other</title>
+    <section>
+        <title>
+        <function moreinfo="none">kazoo_encode(to_encode, target_var)</function>
+        </title>
+        <para>
+        The function encodes the 1st parameter for amqp and puts the result in the 2nd parameter.
+        </para>
+        <para>
+        This function can be used from ANY ROUTE.
+        </para>
+
+        <example>
+        <title><function>kazoo_encode</function> usage</title>
+        <programlisting format="linespecific">
+...
+kazoo_encode("$ci", "$var(callid_encoded)");
+$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
+...
+</programlisting>
+        </example>
+    </section>
+
+    <section>
+        <title>
+        <function moreinfo="none">kazoo_json(json_payload, field, target_var)</function>
+        </title>
+        <para>
+        The function extracts the value from a json payload and puts the result in the 3rd parameter.
+        It can use nested values for the query part.
+        </para>
+        <para>
+        This function can be used from ANY ROUTE.
+        </para>
+
+        <example>
+        <title><function>kazoo_json</function> usage</title>
+        <programlisting format="linespecific">
+...
+kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
+if($du != $null) {
+  xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
+  return;
+}
+...
+</programlisting>
+        </example>
+    </section>
+    
+</section>
+</section>
+    
+        <section>
+        <title>Exported pseudo-variables</title>
+        <itemizedlist>
+            <listitem>
+            <para>
+                <emphasis>$kzR</emphasis>
+                Contains the payload result of kazoo_query execution.
+            </para>
+            </listitem>
+            <listitem>
+            <para>
+                <emphasis>$kzE</emphasis>
+                Contains the payload of a consumed message
+            </para>
+            </listitem>
+        </itemizedlist>
+    </section>
+    
+        <section>
+        <title>Transformations</title>
+        <para>The prefix for kazoo transformations is kz.</para>
+        <itemizedlist>
+            <listitem><para>
+                <emphasis>json</emphasis>
+            </para>
+        <example>
+        <title><function>kz.json</function> usage</title>
+        <programlisting format="linespecific">
+...
+#kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
+$du = $kzR{kz.json,Channels[0].switch_url};
+if($du != $null) {
+  xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
+  return;
+}
+...
+</programlisting>
+        </example>
+            
+            </listitem>
+            <listitem><para>
+                <emphasis>encode</emphasis>
+            </para>
+        <example>
+        <title><function>kz.encode</function> usage</title>
+        <programlisting format="linespecific">
+...
+#kazoo_encode("$ci", "$var(callid_encoded)");
+#$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
+$var(amqp_routing_key) = "call.status_req." + $(ci{kz.encode})
+...
+</programlisting>
+        </example>
+            
+            </listitem>
+        </itemizedlist>
+    </section>
+    
+
 </chapter>
 

+ 9 - 38
modules/kazoo/kazoo.c

@@ -3,7 +3,7 @@
  *
  * Kazoo module interface
  *
- * Copyright (C) 2013 2600Hz
+ * Copyright (C) 2010-2014 2600Hz
  *
  * This file is part of Kamailio, a free SIP server.
  *
@@ -23,7 +23,7 @@
  *
  * History:
  * --------
- * 2013-04  first version (Anca Vamanu)
+ * 2014-08  first version (2600hz)
  */
 
 #include <stdio.h>
@@ -33,14 +33,8 @@
 #include "../../lib/srdb1/db.h"
 #include "../../dprint.h"
 #include "../../lib/kmi/mi.h"
-#include "../tm/tm_load.h"
 #include "../../cfg/cfg_struct.h"
 
-#include "../pua/pua.h"
-#include "../pua/pua_bind.h"
-#include "../pua/send_publish.h"
-#include "../presence/bind_presence.h"
-
 #include "kz_amqp.h"
 #include "kz_json.h"
 #include "kz_fixup.h"
@@ -63,9 +57,9 @@ int dbk_reconn_retries = 8;
 
 int dbk_presentity_phtable_size = 4096;
 
-int dbk_dialog_expires = 30;
-int dbk_presence_expires = 3600;
-int dbk_mwi_expires = 3600;
+//int dbk_dialog_expires = 30;
+//int dbk_presence_expires = 3600;
+//int dbk_mwi_expires = 3600;
 int dbk_create_empty_dialog = 1;
 
 int dbk_channels = 50;
@@ -90,11 +84,6 @@ int dbk_pua_mode = 1;
 int dbk_single_consumer_on_reconnect = 1;
 int dbk_consume_messages_on_reconnect = 1;
 
-
-struct tm_binds tmb;
-pua_api_t kz_pua_api;
-presence_api_t kz_presence_api;
-
 int startup_time = 0;
 
 int *kz_pipe_fds = NULL;
@@ -151,9 +140,9 @@ static cmd_export_t cmds[] = {
 
 static param_export_t params[] = {
     {"node_hostname", STR_PARAM, &dbk_node_hostname.s},
-    {"dialog_expires", INT_PARAM, &dbk_dialog_expires},
-    {"presence_expires", INT_PARAM, &dbk_presence_expires},
-    {"mwi_expires", INT_PARAM, &dbk_mwi_expires},
+  //  {"dialog_expires", INT_PARAM, &dbk_dialog_expires},
+  //  {"presence_expires", INT_PARAM, &dbk_presence_expires},
+  //  {"mwi_expires", INT_PARAM, &dbk_mwi_expires},
     {"amqp_connection", STR_PARAM|USE_FUNC_PARAM,(void*)kz_amqp_add_connection},
     {"amqp_max_channels", INT_PARAM, &dbk_channels},
     {"amqp_consumer_ack_timeout_micro", INT_PARAM, &kz_ack_tv.tv_usec},
@@ -195,20 +184,6 @@ struct module_exports exports = {
     mod_child_init				/* per-child init function */
 };
 
-
-
-static int kz_initialize_bindings() {
-    LM_DBG("kz_initialize_bindings\n");
-
-    /* load all TM stuff */
-    if (load_tm_api(&tmb) == -1) {
-    	LM_ERR("Can't load tm functions. Module TM not loaded?\n");
-    	return -1;
-    }
-
-    return 0;
-}
-
 static int mod_init(void) {
 	int i;
     startup_time = (int) time(NULL);
@@ -225,11 +200,6 @@ static int mod_init(void) {
 
     kz_amqp_init();
 
-    if(kz_initialize_bindings() == -1) {
-   		LM_ERR("Error initializing bindings\n");
-   		return -1;
-   	}
-
     if(dbk_pua_mode == 1) {
 		kz_db_url.len = kz_db_url.s ? strlen(kz_db_url.s) : 0;
 		LM_DBG("db_url=%s/%d/%p\n", ZSW(kz_db_url.s), kz_db_url.len,kz_db_url.s);
@@ -278,6 +248,7 @@ static int mod_init(void) {
     }
 
     register_procs(total_workers);
+    cfg_register_child(total_workers);
 
     return 0;
 }

+ 89 - 53
modules/kazoo/kz_amqp.c

@@ -5,6 +5,7 @@
 #include <amqp_framing.h>
 #include <amqp_tcp_socket.h>
 #include <json/json.h>
+#include <uuid/uuid.h>
 #include "../../mem/mem.h"
 #include "../../timer_proc.h"
 #include "../../sr_module.h"
@@ -112,6 +113,20 @@ amqp_bytes_t kz_amqp_bytes_dup_from_str(str *src)
 	return kz_amqp_bytes_malloc_dup(amqp_cstring_bytes(src->s));
 }
 
+
+void kz_amqp_free_consumer_delivery(kz_amqp_consumer_delivery_ptr ptr)
+{
+	if(ptr == NULL)
+		return;
+	if(ptr->payload)
+		shm_free(ptr->payload);
+	if(ptr->event_key)
+		shm_free(ptr->event_key);
+	if(ptr->event_subkey)
+		shm_free(ptr->event_subkey);
+	shm_free(ptr);
+}
+
 void kz_amqp_free_bind(kz_amqp_bind_ptr bind)
 {
 	if(bind == NULL)
@@ -124,6 +139,10 @@ void kz_amqp_free_bind(kz_amqp_bind_ptr bind)
 		kz_amqp_bytes_free(bind->queue);
 	if(bind->routing_key.bytes)
 		kz_amqp_bytes_free(bind->routing_key);
+	if(bind->event_key.bytes)
+		kz_amqp_bytes_free(bind->event_key);
+	if(bind->event_subkey.bytes)
+		kz_amqp_bytes_free(bind->event_subkey);
 	shm_free(bind);
 }
 
@@ -179,7 +198,7 @@ kz_amqp_cmd_ptr kz_amqp_alloc_pipe_cmd()
 	return cmd;
 }
 
-kz_amqp_bind_ptr kz_amqp_bind_alloc(str* exchange, str* exchange_type, str* queue, str* routing_key )
+kz_amqp_bind_ptr kz_amqp_bind_alloc_ex(str* exchange, str* exchange_type, str* queue, str* routing_key, str* event_key, str* event_subkey )
 {
     kz_amqp_bind_ptr bind = NULL;
 
@@ -222,6 +241,22 @@ kz_amqp_bind_ptr kz_amqp_bind_alloc(str* exchange, str* exchange_type, str* queu
 	    }
 	}
 
+	if(event_key != NULL) {
+		bind->event_key = kz_amqp_bytes_dup_from_str(event_key);
+	    if (bind->event_key.bytes == NULL) {
+			LM_ERR("Out of memory allocating for routing key\n");
+			goto error;
+	    }
+	}
+
+	if(event_subkey != NULL) {
+		bind->event_subkey = kz_amqp_bytes_dup_from_str(event_subkey);
+	    if (bind->event_subkey.bytes == NULL) {
+			LM_ERR("Out of memory allocating for routing key\n");
+			goto error;
+	    }
+	}
+
 	return bind;
 
 error:
@@ -229,6 +264,11 @@ error:
     return NULL;
 }
 
+kz_amqp_bind_ptr kz_amqp_bind_alloc(str* exchange, str* exchange_type, str* queue, str* routing_key )
+{
+	return kz_amqp_bind_alloc_ex(exchange, exchange_type, queue, routing_key, NULL, NULL );
+}
+
 void kz_amqp_init_connection_pool() {
 	if(kz_pool == NULL) {
 		kz_pool = (kz_amqp_conn_pool_ptr) shm_malloc(sizeof(kz_amqp_conn_pool));
@@ -575,7 +615,14 @@ int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload)
     str unique_string = { 0, 0 };
     char serverid[512];
 
-    tmb.generate_callid(&unique_string);
+    uuid_t id;
+    char uuid_buffer[40];
+
+    uuid_generate_random(id);
+    uuid_unparse_lower(id, uuid_buffer);
+    unique_string.s = uuid_buffer;
+    unique_string.len = strlen(unique_string.s);
+
     sprintf(serverid, "kamailio@%.*s-<%d>-script-%lu", dbk_node_hostname.len, dbk_node_hostname.s, my_pid(), rpl_query_routing_key_count++);
 
 
@@ -642,7 +689,14 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_
     str unique_string = { 0, 0 };
     char serverid[512];
 
-    tmb.generate_callid(&unique_string);
+    uuid_t id;
+    char uuid_buffer[40];
+
+    uuid_generate_random(id);
+    uuid_unparse_lower(id, uuid_buffer);
+    unique_string.s = uuid_buffer;
+    unique_string.len = strlen(unique_string.s);
+
     sprintf(serverid, "kamailio@%.*s-<%d>-script-%lu", dbk_node_hostname.len, dbk_node_hostname.s, my_pid(), rpl_query_routing_key_count++);
 
 
@@ -802,7 +856,7 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha
 			return -1;
 		}
 
-		char* strjson = json_object_to_json_string(ret);
+		char* strjson = (char*)json_object_to_json_string(ret);
 		int len = strlen(strjson);
 		char* value = pkg_malloc(len+1);
 		memcpy(value, strjson, len);
@@ -908,6 +962,8 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
 	str queue_s;
 	str routing_key_s;
 	str payload_s;
+	str key_s;
+	str subkey_s;
 	int passive = 0;
 	int durable = 0;
 	int exclusive = 0;
@@ -935,6 +991,8 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
     json_extract_field("type", exchange_type_s);
     json_extract_field("queue", queue_s);
     json_extract_field("routing", routing_key_s);
+    json_extract_field("event_key", key_s);
+    json_extract_field("event_subkey", subkey_s);
 
     tmpObj = json_object_object_get(json_obj, "passive");
     if(tmpObj != NULL) {
@@ -1105,43 +1163,6 @@ int get_channel_index() {
 	return get_channel_index();
 }
 
-/*
-int kz_amqp_unbind_channel(kz_amqp_conn_ptr kz_conn, int idx )
-{
-    kz_amqp_bind_ptr reply = channels[idx].targeted;
-    int ret = 0;
-	if(reply == NULL) {
-		LM_ERR("unbinding channel NULL??\n");
-		ret = -1;
-		goto error;
-	}
-
-    if (amqp_basic_cancel(kz_conn->conn, channels[idx].channel, amqp_empty_bytes) < 0
-	    || kz_amqp_error("Canceling", amqp_get_rpc_reply(kz_conn->conn)))
-    {
-		ret = -RET_AMQP_ERROR;
-		goto error;
-    }
-
-    if (amqp_queue_unbind(kz_conn->conn, channels[idx].channel, reply->queue, reply->exchange, reply->routing_key, amqp_empty_table) < 0
-	    || kz_amqp_error("Unbinding queue", amqp_get_rpc_reply(kz_conn->conn)))
-    {
-		ret = -RET_AMQP_ERROR;
-		goto error;
-    }
-
-    amqp_queue_delete(kz_conn->conn, channels[idx].channel, reply->queue, 0, 0);
-
-    kz_amqp_free_binding(reply);
-    channels[idx].targeted = NULL;
-    channels[idx].state = KZ_AMQP_FREE;
-
-error:
-	return ret;
-}
-*/
-
-
 int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int loopcount, int idx )
 {
     kz_amqp_bind_ptr bind = NULL;
@@ -1382,7 +1403,7 @@ int kz_amqp_consumer_fire_event(char *eventkey)
 
 }
 
-void kz_amqp_consumer_event(int child_no, char *payload)
+void kz_amqp_consumer_event(int child_no, char *payload, char* event_key, char* event_subkey)
 {
     json_obj_ptr json_obj = NULL;
     str ev_name = {0, 0}, ev_category = {0, 0};
@@ -1399,20 +1420,33 @@ void kz_amqp_consumer_event(int child_no, char *payload)
 		return;
     }
 
-    json_extract_field(dbk_consumer_event_key.s, ev_category);
-    json_extract_field(dbk_consumer_event_subkey.s, ev_name);
+    char* key = (event_key == NULL ? dbk_consumer_event_key.s : event_key);
+    char* subkey = (event_subkey == NULL ? dbk_consumer_event_subkey.s : event_subkey);
+
+    json_extract_field(key, ev_category);
+    json_extract_field(subkey, ev_name);
 
     sprintf(buffer, "kazoo:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s);
     for (p=buffer ; *p; ++p) *p = tolower(*p);
     for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
     if(kz_amqp_consumer_fire_event(buffer) != 0) {
-        sprintf(buffer, "kazoo:consumer-event-%.*s-%.*s",dbk_consumer_event_key.len, dbk_consumer_event_key.s, dbk_consumer_event_subkey.len, dbk_consumer_event_subkey.s);
+        sprintf(buffer, "kazoo:consumer-event-%.*s",ev_category.len, ev_category.s);
         for (p=buffer ; *p; ++p) *p = tolower(*p);
         for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
         if(kz_amqp_consumer_fire_event(buffer) != 0) {
-            sprintf(buffer, "kazoo:consumer-event");
+            sprintf(buffer, "kazoo:consumer-event-%s-%s", key, subkey);
+            for (p=buffer ; *p; ++p) *p = tolower(*p);
+            for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
             if(kz_amqp_consumer_fire_event(buffer) != 0) {
-                LM_ERR("kazoo:consumer-event not found");
+                sprintf(buffer, "kazoo:consumer-event-%s", key);
+                for (p=buffer ; *p; ++p) *p = tolower(*p);
+                for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
+				if(kz_amqp_consumer_fire_event(buffer) != 0) {
+					sprintf(buffer, "kazoo:consumer-event");
+					if(kz_amqp_consumer_fire_event(buffer) != 0) {
+						LM_ERR("kazoo:consumer-event not found");
+					}
+				}
             }
         }
     }
@@ -1447,7 +1481,7 @@ void kz_amqp_consumer_loop(int child_no)
 				kz_amqp_consumer_delivery_ptr ptr;
 				if(read(data_pipe, &ptr, sizeof(ptr)) == sizeof(ptr)) {
 					LM_DBG("consumer %d received payload %s\n", child_no, ptr->payload);
-					kz_amqp_consumer_event(child_no, ptr->payload);
+					kz_amqp_consumer_event(child_no, ptr->payload, ptr->event_key, ptr->event_subkey);
 					if(ptr->channel > 0 && ptr->delivery_tag > 0) {
 						kz_amqp_cmd_ptr cmd = kz_amqp_alloc_pipe_cmd();
 						cmd->type = KZ_AMQP_ACK;
@@ -1457,8 +1491,7 @@ void kz_amqp_consumer_loop(int child_no)
 							LM_ERR("failed to send ack to AMQP Manager in process %d, write to command pipe: %s\n", getpid(), strerror(errno));
 						}
 					}
-					shm_free(ptr->payload);
-					shm_free(ptr);
+					kz_amqp_free_consumer_delivery(ptr);
 				}
 			}
     	}
@@ -1479,7 +1512,7 @@ int check_timeout(struct timeval *now, struct timeval *start, struct timeval *ti
 
 int consumer = 1;
 
-void kz_amqp_send_consumer_event_ex(char* payload, amqp_channel_t channel, uint64_t delivery_tag, int nextConsumer)
+void kz_amqp_send_consumer_event_ex(char* payload, char* event_key, char* event_subkey, amqp_channel_t channel, uint64_t delivery_tag, int nextConsumer)
 {
 	kz_amqp_consumer_delivery_ptr ptr = (kz_amqp_consumer_delivery_ptr) shm_malloc(sizeof(kz_amqp_consumer_delivery));
 	if(ptr == NULL) {
@@ -1490,7 +1523,8 @@ void kz_amqp_send_consumer_event_ex(char* payload, amqp_channel_t channel, uint6
 	ptr->channel = channel;
 	ptr->delivery_tag = delivery_tag;
 	ptr->payload = payload;
-
+	ptr->event_key = event_key;
+	ptr->event_subkey = event_subkey;
 	if (write(kz_pipe_fds[consumer*2+1], &ptr, sizeof(ptr)) != sizeof(ptr)) {
 		LM_ERR("failed to send payload to consumer %d : %s\nPayload %s\n", consumer, strerror(errno), payload);
 	}
@@ -1505,7 +1539,7 @@ void kz_amqp_send_consumer_event_ex(char* payload, amqp_channel_t channel, uint6
 
 void kz_amqp_send_consumer_event(char* payload, int nextConsumer)
 {
-	kz_amqp_send_consumer_event_ex(payload, 0, 0, nextConsumer);
+	kz_amqp_send_consumer_event_ex(payload, NULL, NULL, 0, 0, nextConsumer);
 }
 
 void kz_amqp_fire_connection_event(char *event, char* host)
@@ -1717,6 +1751,8 @@ void kz_amqp_manager_loop(int child_no)
 						break;
 					case KZ_AMQP_CONSUMING:
 						kz_amqp_send_consumer_event_ex(kz_amqp_bytes_dup(envelope.message.body),
+								kz_amqp_bytes_dup(channels[idx].consumer->event_key),
+								kz_amqp_bytes_dup(channels[idx].consumer->event_subkey),
 								channels[idx].consumer->no_ack ? 0 : envelope.channel,
 								channels[idx].consumer->no_ack ? 0 : envelope.delivery_tag,
 								(firstLoop && dbk_single_consumer_on_reconnect) ? 0 : 1);

+ 4 - 2
modules/kazoo/kz_amqp.h

@@ -11,7 +11,6 @@
 #include <amqp.h>
 
 #include "../../sr_module.h"
-#include "../tm/tm_load.h"
 
 #include "const.h"
 #include "defs.h"
@@ -22,7 +21,6 @@ typedef kz_amqp_connection_info *kz_amqp_connection_info_ptr;
 
 extern int dbk_channels;
 extern str dbk_node_hostname;
-extern struct tm_binds tmb;
 extern str dbk_consumer_event_key;
 extern str dbk_consumer_event_subkey;
 extern int dbk_consumer_processes;
@@ -86,6 +84,8 @@ typedef struct {
 	char* payload;
 	uint64_t delivery_tag;
 	amqp_channel_t channel;
+	char* event_key;
+	char* event_subkey;
 } kz_amqp_consumer_delivery, *kz_amqp_consumer_delivery_ptr;
 
 typedef struct {
@@ -93,6 +93,8 @@ typedef struct {
 	amqp_bytes_t exchange_type;
 	amqp_bytes_t routing_key;
 	amqp_bytes_t queue;
+	amqp_bytes_t event_key;
+	amqp_bytes_t event_subkey;
 	amqp_boolean_t passive;
 	amqp_boolean_t durable;
 	amqp_boolean_t exclusive;

+ 10 - 7
modules/kazoo/kz_pua.c

@@ -18,9 +18,6 @@
 #include "const.h"
 
 
-extern int dbk_dialog_expires;
-extern int dbk_presence_expires;
-extern int dbk_mwi_expires;
 extern int dbk_include_entity;
 extern int dbk_pua_mode;
 
@@ -87,7 +84,7 @@ int kz_pua_update_presentity(str* event, str* realm, str* user, str* etag, str*
 	query_cols[n_query_cols] = &str_expires_col;
 	query_vals[n_query_cols].type = DB1_INT;
 	query_vals[n_query_cols].nul = 0;
-	query_vals[n_query_cols].val.int_val = expires+(int)time(NULL);
+	query_vals[n_query_cols].val.int_val = expires;
 	n_query_cols++;
 
 	if (kz_pa_dbf.use_table(kz_pa_db, &kz_presentity_table) < 0)
@@ -173,7 +170,7 @@ int kz_pua_publish_presence_to_presentity(struct json_object *json_obj) {
     str activity = str_init("");
     str note = str_init("Idle");
     str status = str_presence_status_online;
-    int expires = dbk_presence_expires;
+    int expires = 0;
 
     char *body = (char *)pkg_malloc(PRESENCE_BODY_BUFFER_SIZE);
     if(body == NULL) {
@@ -197,6 +194,8 @@ int kz_pua_publish_presence_to_presentity(struct json_object *json_obj) {
     struct json_object* ExpiresObj = json_object_object_get(json_obj, BLF_JSON_EXPIRES);
     if(ExpiresObj != NULL) {
     	expires = json_object_get_int(ExpiresObj);
+    	if(expires > 0)
+    		expires += (int)time(NULL);
     }
 
     if (!from_user.len || !to_user.len || !state.len) {
@@ -250,7 +249,7 @@ int kz_pua_publish_mwi_to_presentity(struct json_object *json_obj) {
         mwi_new = { 0, 0 }, mwi_saved = { 0, 0 },
         mwi_urgent = { 0, 0 }, mwi_urgent_saved = { 0, 0 },
         mwi_account = { 0, 0 }, mwi_body = { 0, 0 };
-    int expires = dbk_mwi_expires;
+    int expires = 0;
 
     char *body = (char *)pkg_malloc(MWI_BODY_BUFFER_SIZE);
     if(body == NULL) {
@@ -280,6 +279,8 @@ int kz_pua_publish_mwi_to_presentity(struct json_object *json_obj) {
     struct json_object* ExpiresObj = json_object_object_get(json_obj, BLF_JSON_EXPIRES);
     if(ExpiresObj != NULL) {
     	expires = json_object_get_int(ExpiresObj);
+    	if(expires > 0)
+    		expires += (int)time(NULL);
     }
 
     sprintf(body, MWI_BODY, mwi_waiting.len, mwi_waiting.s,
@@ -314,7 +315,7 @@ int kz_pua_publish_dialoginfo_to_presentity(struct json_object *json_obj) {
     char sender_buf[1024];
     str sender = {0, 0};
     str dialoginfo_body = {0 , 0};
-    int expires = dbk_dialog_expires;
+    int expires = 0;
     str event = str_init("dialog");
     int reset = 0;
 
@@ -341,6 +342,8 @@ int kz_pua_publish_dialoginfo_to_presentity(struct json_object *json_obj) {
     struct json_object* ExpiresObj = json_object_object_get(json_obj, BLF_JSON_EXPIRES);
     if(ExpiresObj != NULL) {
     	expires = json_object_get_int(ExpiresObj);
+    	if(expires > 0)
+    		expires += (int)time(NULL);
     }
 
     ExpiresObj = json_object_object_get(json_obj, "Flush-Level");