Browse Source

nsq: deprecate json and pua funcs for json api and pua_json modules

Emmanuel Schmidbauer 7 năm trước cách đây
mục cha
commit
1f09a38982

+ 0 - 3
src/modules/nsq/Makefile

@@ -12,7 +12,4 @@ NAME=nsq.so
 LIBS=-lnsq -lev -levbuffsock -lcurl -ljson-c
 DEFS+=-I$(LOCALBASE)/include -I/usr/local/include $(shell pkg-config --cflags json-c)
 
-SERLIBPATH=../../lib
-SER_LIBS=$(SERLIBPATH)/srdb1/srdb1
-
 include ../../Makefile.modules

+ 8 - 123
src/modules/nsq/doc/nsq_admin.xml

@@ -52,8 +52,9 @@
 			</itemizedlist>
 		</para>
 		<para>
-			The NSQ module also has support to publish updates to presence module through
-			the nsq_pua_publish() function.
+			<emphasis>IMPORTANT</emphasis>
+			The `nsq.json` transformation is deprecated in favor of the json module's `json.parse` transformation.
+			The `nsq_pua_publish()` function is deprecated in favor of pua_json module's `pua_json_publish()` function.
 		</para>
 
 	</section>
@@ -343,99 +344,6 @@ modparam("nsq", "topic_channel", "My-NSQ-Topic-2:My-NSQ-Channel-2")
 			</example>
 		</section>
 
-		<section>
-			<title><varname>db_url</varname>(str)</title>
-			<para>
-				The database for the presentity table.
-			</para>
-			<para>
-				If set, the nsq_pua_publish function will update the presentity status in the database.
-			</para>
-			<para>
-				Usage: presence related.
-			</para>
-			<para>
-				<emphasis>Default value is <quote>NULL</quote>.</emphasis>
-			</para>
-			<example>
-			<title>Set <varname>db_url</varname> parameter</title>
-<programlisting format="linespecific">
-...
-modparam("nsq", "db_url", "&defaultdb;")
-...
-</programlisting>
-			</example>
-		</section>
-
-		<section>
-			<title><varname>presentity_table</varname>(str)</title>
-			<para>
-				The name of the presentity table in the database.
-			</para>
-			<para>
-				<emphasis>Default value is <quote>presentity</quote>.</emphasis>
-			</para>
-			<example>
-				<title>Set <varname>presentity_table</varname> parameter</title>
-<programlisting format="linespecific">
-...
-modparam("nsq", "presentity_table", "my_presentity_table")
-...
-</programlisting>
-			</example>
-		</section>
-
-		<section>
-			<title><varname>db_table_lock_type</varname>(int)</title>
-			<para>
-			Enable (=1) or disable (=0) the locks for table during a
-			transaction.
-			</para>
-			<para>
-				<emphasis>Default value is <quote>1</quote>.</emphasis>
-			</para>
-			<example>
-				<title>Set <varname>db_table_lock_type</varname> parameter</title>
-<programlisting format="linespecific">
-...
-modparam("nsq", "db_table_lock_type", 0)
-...
-</programlisting>
-			</example>
-		</section>
-
-	</section>
-	<section>
-		<title>Functions</title>
-		<section>
-			<title>
-				<function moreinfo="none">nsq_pua_publish(json_payload)</function>
-			</title>
-			<para>
-				The function build presentity state from json_payload and updates presentity table.
-			</para>
-			<para>
-				Usage: presence related.
-			</para>
-			<para>
-				This function can be used from ANY ROUTE.
-			</para>
-
-			<example>
-				<title><function>nsq_pua_publish</function> usage</title>
-<programlisting format="linespecific">
-...
-event_route[nsq:consumer-event-presence-update]
-{
-	xlog("L_INFO", "received $(nsqE{nsq.json,Event-Package}) update for $(nsqE{nsq.json,From})");
-	nsq_pua_publish($nsqE);
-	pres_refresh_watchers("$(nsqE{nsq.json,From})", "$(nsqE{nsq.json,Event-Package})", 1);
-}
-...
-</programlisting>
-			</example>
-		</section>
-
 	</section>
 
 	<section>
@@ -459,32 +367,6 @@ event_route[nsq:consumer-event-presence-update]
 		</itemizedlist>
 	</section>
 
-	<section>
-		<title>Transformations</title>
-		<para>The prefix for nsq transformations is nsq.</para>
-		<para>You can use the transformation to extract values from the json structured $nsqE pseudo variable</para>
-		<itemizedlist>
-			<listitem><para>
-					<emphasis>json</emphasis>
-				</para>
-				<example>
-					<title><function>nsq.json</function> usage</title>
-<programlisting format="linespecific">
-...
-# extract value of "Custom-Data" from $nsqE pseudo variable and set it to $var(Custom-Data)
-$var(Custom-Data) = $(nsqE{nsq.json,Custom-Data});
-if($var(Custom-Data) != $null) {
-	xlog("L_INFO", "$ci|log|custom data: $var(Custom-Data)");
-}
-...
-</programlisting>
-				</example>
-
-			</listitem>
-
-		</itemizedlist>
-	</section>
-
 	<section>
 		<title>Event Routes</title>
 			<para>
@@ -509,14 +391,17 @@ event_route[nsq:consumer-event-presence-update]
 {
 	# presence is the value extracted from Event-Category field in json payload
 	# update is the value extracted from Event-Name field in json payload
-	xlog("L_INFO", "received $(nsqE{nsq.json,Event-Package}) update for $(nsqE{nsq.json,From})");
+	if ($(nsqE{json.parse,Event-Package}) == "dialog") {
+		xlog("L_INFO", "received $(nsqE{json.parse,Event-Package}) update for $(nsqE{json.parse,From})");
+		pua_json_publish($nsqE);
+	}
 	...
 }
 
 event_route[nsq:consumer-event-presence]
 {
 	# presence is the value extracted from Event-Category field in json payload
-	xlog("L_INFO", "received $(nsqE{nsq.json,Event-Package}) update for $(nsqE{nsq.json,From})");
+	xlog("L_INFO", "received $(nsqE{json.parse,Event-Package}) update for $(nsqE{json.parse,From})");
 	...
 }
 

+ 0 - 308
src/modules/nsq/nsq_json.c

@@ -1,308 +0,0 @@
-/*
- * NSQ module interface
- *
- * Copyright (C) 2010-2014 2600Hz
- *
- * This file is part of Kamailio, a free SIP server.
- *
- * Kamailio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version
- *
- * Kamailio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
- *
- * Contributor(s):
- * Emmanuel Schmidbauer <[email protected]>
- *
- */
-
-#include "nsq_json.h"
-
-# define json_foreach_key(obj,key) \
-	char *key;\
-	struct lh_entry *entry ## key; \
-	struct lh_entry *entry_next ## key = NULL; \
-	for(entry ## key = json_object_get_object(obj)->head; \
-		(entry ## key ? ( \
-			key = (char*)entry ## key->k, \
-			entry_next ## key = entry ## key->next, \
-			entry ## key) : 0); \
-		entry ## key = entry_next ## key)
-
-
-
-static str nsq_pv_str_empty = {"", 0};
-
-char **str_split(char* a_str, const char a_delim)
-{
-	char** result    = 0;
-	size_t count     = 0;
-	char* tmp        = a_str;
-	char* last_comma = 0;
-	char delim[2];
-	delim[0] = a_delim;
-	delim[1] = 0;
-	int len = 0;
-
-	/* Count how many elements will be extracted. */
-	while (*tmp) {
-		if (a_delim == *tmp) {
-			count++;
-			last_comma = tmp;
-		}
-		tmp++;
-	}
-
-	/* Add space for trailing token. */
-	count += last_comma < (a_str + strlen(a_str) - 1);
-
-	/* Add space for terminating null string so caller
-	   knows where the list of returned strings ends. */
-	count++;
-
-	result = pkg_malloc(sizeof(char*) * count);
-
-	if (result) {
-		size_t idx  = 0;
-		char* token = strtok(a_str, delim);
-
-		while (token) {
-			assert(idx < count);
-			len = strlen(token);
-			char* ptr = pkg_malloc( (len+1) * sizeof(char));
-			*(result + idx) = ptr;
-			memcpy(ptr, token, len);
-			ptr[len] = '\0';
-			int i = 0;
-			while(i < len) {
-				if (ptr[i] == nsq_json_escape_char)
-					ptr[i] = '.';
-				i++;
-			}
-			token = strtok(0, delim);
-			idx++;
-		}
-		assert(idx == count - 1);
-		*(result + idx) = 0;
-	}
-
-	return result;
-}
-
-struct json_object * nsq_json_get_field_object(str* json, str* field)
-{
-	char** tokens;
-	char* dup;
-	char f1[25], f2[25];//, f3[25];
-	int i;
-
-	dup = pkg_malloc(json->len+1);
-	memcpy(dup, json->s, json->len);
-	dup[json->len] = '\0';
-	struct json_object *j = json_tokener_parse(dup);
-	pkg_free(dup);
-
-	if (j==NULL) {
-		LM_ERR("empty or invalid JSON\n");
-		return NULL;
-	}
-
-	struct json_object *jtree = NULL;
-	struct json_object *ret = NULL;
-
-	LM_DBG("getting json %.*s\n", field->len, field->s);
-
-	dup = pkg_malloc(field->len+1);
-	memcpy(dup, field->s, field->len);
-	dup[field->len] = '\0';
-	tokens = str_split(dup, '.');
-	pkg_free(dup);
-
-	if (tokens) {
-		jtree = j;
-		for (i = 0; *(tokens + i); i++) {
-			if (jtree != NULL) {
-				str field = str_init(*(tokens + i));
-				// check for idx []
-				int sresult = sscanf(field.s, "%[^[][%[^]]]", f1, f2); //, f3);
-				LM_DBG("CHECK IDX %d - %s , %s, %s\n", sresult, field.s, f1, (sresult > 1? f2 : "(null)"));
-
-				jtree = nsq_json_get_object(jtree, f1);
-				if (jtree != NULL) {
-					char *value = (char*)json_object_get_string(jtree);
-					LM_DBG("JTREE OK %s\n", value);
-				}
-				if (jtree != NULL && sresult > 1 && json_object_is_type(jtree, json_type_array)) {
-					int idx = atoi(f2);
-					jtree = json_object_array_get_idx(jtree, idx);
-					if (jtree != NULL) {
-						char *value = (char*)json_object_get_string(jtree);
-						LM_DBG("JTREE IDX OK %s\n", value);
-					}
-				}
-			}
-			pkg_free(*(tokens + i));
-		}
-		pkg_free(tokens);
-	}
-
-
-	if (jtree != NULL)
-		ret = json_object_get(jtree);
-
-	json_object_put(j);
-
-	return ret;
-}
-
-
-int nsq_json_get_field_ex(str* json, str* field, pv_value_p dst_val)
-{
-	struct json_object *jtree = nsq_json_get_field_object(json, field);
-
-
-	if (jtree != NULL) {
-		char *value = (char*)json_object_get_string(jtree);
-		int len = strlen(value);
-		dst_val->rs.s = pkg_malloc(len+1);
-		memcpy(dst_val->rs.s, value, len);
-		dst_val->rs.s[len] = '\0';
-		dst_val->rs.len = len;
-		dst_val->flags = PV_VAL_STR | PV_VAL_PKG;
-		dst_val->ri = 0;
-		json_object_put(jtree);
-	} else {
-		dst_val->flags = PV_VAL_NULL;
-		dst_val->rs = nsq_pv_str_empty;
-		dst_val->ri = 0;
-	}
-	return 1;
-}
-
-
-int nsq_json_get_field(struct sip_msg* msg, char* json, char* field, char* dst)
-{
-	str json_s;
-	str field_s;
-	pv_spec_t *dst_pv;
-	pv_value_t dst_val;
-
-	if (fixup_get_svalue(msg, (gparam_p)json, &json_s) != 0) {
-		LM_ERR("cannot get json string value\n");
-		return -1;
-	}
-
-	if (fixup_get_svalue(msg, (gparam_p)field, &field_s) != 0) {
-		LM_ERR("cannot get field string value\n");
-		return -1;
-	}
-
-	if (nsq_json_get_field_ex(&json_s, &field_s, &dst_val) != 1)
-		return -1;
-
-	dst_pv = (pv_spec_t *)dst;
-	dst_pv->setf(msg, &dst_pv->pvp, (int)EQ_T, &dst_val);
-	if (dst_val.flags & PV_VAL_PKG) {
-		pkg_free(dst_val.rs.s);
-	} else if (dst_val.flags & PV_VAL_SHM) {
-		shm_free(dst_val.rs.s);
-	}
-
-	return 1;
-}
-
-struct json_object* nsq_json_parse(const char *str)
-{
-	struct json_tokener* tok;
-	struct json_object* obj;
-
-	tok = json_tokener_new();
-	if (!tok) {
-		LM_ERR("Error parsing json: could not allocate tokener\n");
-		return NULL;
-	}
-
-	obj = json_tokener_parse_ex(tok, str, -1);
-	if (tok->err != json_tokener_success) {
-		LM_ERR("Error parsing json: %s\n", json_tokener_error_desc(tok->err));
-		LM_ERR("%s\n", str);
-		if (obj != NULL) {
-			json_object_put(obj);
-		}
-		obj = NULL;
-	}
-
-	json_tokener_free(tok);
-	return obj;
-}
-
-struct json_object* nsq_json_get_object(struct json_object* jso, const char *key)
-{
-	struct json_object *result = NULL;
-	json_object_object_get_ex(jso, key, &result);
-	return result;
-}
-
-int nsq_json_get_keys(struct sip_msg* msg, char* json, char* field, char* dst)
-{
-	str json_s;
-	str field_s;
-	int_str keys_avp_name;
-	unsigned short keys_avp_type;
-	pv_spec_t *avp_spec;
-
-	if (fixup_get_svalue(msg, (gparam_p)json, &json_s) != 0) {
-		LM_ERR("cannot get json string value\n");
-		return -1;
-	}
-
-	if (fixup_get_svalue(msg, (gparam_p)field, &field_s) != 0) {
-		LM_ERR("cannot get field string value\n");
-		return -1;
-	}
-
-	if (dst == NULL){
-		LM_ERR("avp spec is null\n");
-		return -1;
-	}
-
-	avp_spec = (pv_spec_t *)dst;
-
-	if (avp_spec->type != PVT_AVP) {
-		LM_ERR("invalid avp spec\n");
-		return -1;
-	}
-
-	if (pv_get_avp_name(0, &avp_spec->pvp, &keys_avp_name, &keys_avp_type)!=0) {
-		LM_ERR("invalid AVP definition\n");
-		return -1;
-	}
-
-	struct json_object *jtree = nsq_json_get_field_object(&json_s, &field_s);
-
-	if (jtree != NULL) {
-		json_foreach_key(jtree, k) {
-			LM_DBG("ITERATING KEY %s\n", k);
-			int_str v1;
-			v1.s.s = k;
-			v1.s.len = strlen(k);
-			if (add_avp(AVP_VAL_STR|keys_avp_type, keys_avp_name, v1) < 0) {
-				LM_ERR("failed to create AVP\n");
-			    json_object_put(jtree);
-				return -1;
-			}
-		}
-	    json_object_put(jtree);
-	}
-
-	return 1;
-}
-

+ 0 - 59
src/modules/nsq/nsq_json.h

@@ -1,59 +0,0 @@
-/*
- * NSQ module interface
- *
- * Copyright (C) 2010-2014 2600Hz
- *
- * This file is part of Kamailio, a free SIP server.
- *
- * Kamailio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version
- *
- * Kamailio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
- *
- * Contributor(s):
- * Emmanuel Schmidbauer <[email protected]>
- *
- */
-
-#ifndef __NSQ_JSON_H_
-#define __NSQ_JSON_H_
-
-#include "../../core/mod_fix.h"
-#include "../../core/lvalue.h"
-#include "../../core/parser/msg_parser.h"
-#include <json.h>
-
-#define json_extract_field(json_name, field)  do {                      \
-	struct json_object* obj =  nsq_json_get_object(json_obj, json_name); \
-	field.s = (char*)json_object_get_string(obj);                       \
-	if (field.s == NULL) {                                              \
-	  LM_DBG("Json-c error - failed to extract field [%s]\n", json_name); \
-	  field.s = "";                                                     \
-	} else {                                                            \
-	  field.len = strlen(field.s);                                      \
-	}                                                                   \
-	LM_DBG("%s: [%s]\n", json_name, field.s?field.s:"Empty");           \
-  } while (0);
-
-
-extern char nsq_json_escape_char;
-extern str nsq_event_key;
-extern str nsq_event_sub_key;
-
-int nsq_json_get_field(struct sip_msg* msg, char* json, char* field, char* dst);
-int nsq_json_get_field_ex(str* json, str* field, pv_value_p dst_val);
-int nsq_json_get_keys(struct sip_msg* msg, char* json, char* field, char* dst);
-
-struct json_object* nsq_json_parse(const char *str);
-struct json_object* nsq_json_get_object(struct json_object* jso, const char *key);
-
-#endif /* __NSQ_JSON_H_ */

+ 11 - 85
src/modules/nsq/nsq_mod.c

@@ -24,28 +24,19 @@
  *
  */
 
+#include "../json/api.h"
 #include "nsq_mod.h"
 
 MODULE_VERSION
 
-static tr_export_t mod_trans[] = {
-	{ {"nsq", sizeof("nsq")-1}, nsq_tr_parse},
-	{ { 0, 0 }, 0 }
-};
-
 static pv_export_t nsq_mod_pvs[] = {
 	{{"nsqE", (sizeof("nsqE")-1)}, PVT_OTHER, nsq_pv_get_event_payload, 0, 0, 0, 0, 0},
 	{ {0, 0}, 0, 0, 0, 0, 0, 0, 0 }
 };
 
-static cmd_export_t cmds[] = {
-	{"nsq_pua_publish", (cmd_function) nsq_pua_publish, 1, 0, 0, ANY_ROUTE},
-	{0, 0, 0, 0, 0, 0}
-};
-
 static param_export_t params[]=
 {
-	{"consumer_workers", INT_PARAM, &dbn_consumer_workers},
+	{"consumer_workers", INT_PARAM, &nsq_consumer_workers},
 	{"max_in_flight", INT_PARAM, &nsq_max_in_flight},
 	{"lookupd_address", PARAM_STR, &nsq_lookupd_address},
 	{"lookupd_port", INT_PARAM, &lookupd_port},
@@ -55,12 +46,6 @@ static param_export_t params[]=
 	{"nsqd_port", INT_PARAM, &nsqd_port},
 	{"consumer_event_key", PARAM_STR, &nsq_event_key},
 	{"consumer_event_subkey", PARAM_STR, &nsq_event_sub_key},
-	{"pua_include_entity", INT_PARAM, &dbn_include_entity},
-	{"presentity_table", PARAM_STR, &nsq_presentity_table},
-	{"db_url", PARAM_STR, &nsq_db_url},
-	{"pua_mode", INT_PARAM, &dbn_pua_mode},
-	{"json_escape_char", PARAM_STR, &nsq_json_escape_str},
-	{"db_table_lock_type", INT_PARAM, &db_table_lock_type},
 	{ 0, 0, 0 }
 };
 
@@ -115,7 +100,7 @@ static int nsq_add_topic_channel(modparam_t type, void *val)
 struct module_exports exports = {
 	"nsq",
 	DEFAULT_DLFLAGS,	/* dlopen flags */
-	cmds,			/* Exported functions */
+	0,			/* Exported functions */
 	params,			/* Exported parameters */
 	0,                      /* exported MI functions */
 	nsq_mod_pvs,            /* exported pseudo-variables */
@@ -157,50 +142,18 @@ static int fire_init_event(int rank)
 
 static int mod_init(void)
 {
-	startup_time = (int) time(NULL);
-
-	if (dbn_pua_mode == 1) {
-		nsq_db_url.len = nsq_db_url.s ? strlen(nsq_db_url.s) : 0;
-		LM_DBG("db_url=%s/%d/%p\n", ZSW(nsq_db_url.s), nsq_db_url.len,nsq_db_url.s);
-		nsq_presentity_table.len = strlen(nsq_presentity_table.s);
-
-		if (nsq_db_url.len > 0) {
-
-			/* binding to database module  */
-			if (db_bind_mod(&nsq_db_url, &nsq_pa_dbf)) {
-				LM_ERR("Database module not found\n");
-				return -1;
-			}
-
-			if (!DB_CAPABILITY(nsq_pa_dbf, DB_CAP_ALL)) {
-				LM_ERR("Database module does not implement all functions"
-						" needed by NSQ module\n");
-				return -1;
-			}
-
-			nsq_pa_db = nsq_pa_dbf.init(&nsq_db_url);
-			if (!nsq_pa_db) {
-				LM_ERR("Connection to database failed\n");
-				return -1;
-			}
-
-			if (db_table_lock_type != 1) {
-				db_table_lock = DB_LOCKING_NONE;
-			}
-
-			nsq_pa_dbf.close(nsq_pa_db);
-			nsq_pa_db = NULL;
-		}
+	if(json_load_api(&json_api) < 0) {
+		LM_ERR("cannot bind to JSON API\n");
+		return -1;
 	}
-
-	LM_DBG("NSQ Workers per Topic/Channel: %d\n", dbn_consumer_workers);
+	LM_DBG("NSQ Workers per Topic/Channel: %d\n", nsq_consumer_workers);
 	if (!nsq_topic_channel_counter) {
 		nsq_topic_channel_counter = 1;
 	}
 	LM_DBG("NSQ Total Topic/Channel: %d\n", nsq_topic_channel_counter);
-	dbn_consumer_workers = dbn_consumer_workers * nsq_topic_channel_counter;
-	LM_DBG("NSQ Total Workers: %d\n", dbn_consumer_workers);
-	int total_workers = dbn_consumer_workers + 2;
+	nsq_consumer_workers = nsq_consumer_workers * nsq_topic_channel_counter;
+	LM_DBG("NSQ Total Workers: %d\n", nsq_consumer_workers);
+	int total_workers = nsq_consumer_workers + 2;
 
 	register_procs(total_workers);
 	cfg_register_child(total_workers);
@@ -208,15 +161,6 @@ static int mod_init(void)
 	return 0;
 }
 
-int mod_register(char *path, int *dlflags, void *p1, void *p2)
-{
-	if (nsq_tr_init_buffers() < 0) {
-		LM_ERR("failed to initialize transformations buffers\n");
-		return -1;
-	}
-	return register_trans_mod(path, mod_trans);
-}
-
 /**
  *
  */
@@ -255,7 +199,7 @@ static int mod_child_init(int rank)
 {
 	int pid;
 	int i;
-	int workers = dbn_consumer_workers / nsq_topic_channel_counter;
+	int workers = nsq_consumer_workers / nsq_topic_channel_counter;
 	int max_in_flight = 1;
 
 	if (nsq_max_in_flight > 1) {
@@ -304,24 +248,6 @@ static int mod_child_init(int rank)
 		return 0;
 	}
 
-	if (dbn_pua_mode == 1) {
-		if (nsq_pa_dbf.init == 0) {
-			LM_CRIT("child_init: database not bound\n");
-			return -1;
-		}
-		nsq_pa_db = nsq_pa_dbf.init(&nsq_db_url);
-		if (!nsq_pa_db) {
-			LM_ERR("child %d: unsuccessful connecting to database\n", rank);
-			return -1;
-		}
-
-		if (nsq_pa_dbf.use_table(nsq_pa_db, &nsq_presentity_table) < 0) {
-			LM_ERR( "child %d:unsuccessful use_table presentity_table\n", rank);
-			return -1;
-		}
-		LM_DBG("child %d: Database connection opened successfully\n", rank);
-	}
-
 	return 0;
 }
 

+ 2 - 21
src/modules/nsq/nsq_mod.h

@@ -27,14 +27,8 @@
 #ifndef __NSQ_MOD_H_
 #define __NSQ_MOD_H_
 
-#include <sys/types.h>
-#include <sys/wait.h>
-
 #include "../../core/cfg/cfg_struct.h"
-#include "../../lib/srdb1/db.h"
 #include "nsq_reader.h"
-#include "nsq_trans.h"
-#include "nsq_pua.h"
 
 #define DBN_DEFAULT_NO_WORKERS 4
 #define LOOKUPD_ADDRESS "127.0.0.1"
@@ -43,7 +37,6 @@
 #define DEFAULT_CHANNEL "Kamailio-Channel"
 #define DEFAULT_TOPIC "Kamailio-Topic"
 #define NSQD_ADDRESS "127.0.0.1"
-#define PRESENTITY_TABLE "presentity"
 
 typedef struct nsq_topic_channel
 {
@@ -62,24 +55,12 @@ str nsq_event_key = str_init(CONSUMER_EVENT_KEY);
 str nsq_event_sub_key = str_init(CONSUMER_EVENT_SUB_KEY);
 str nsqd_address = str_init(NSQD_ADDRESS);
 int nsqd_port = 4150;
-int dbn_pua_mode = 1;
-int dbn_include_entity = 1;
+json_api_t json_api;
 
 nsq_topic_channel_t *tc_list = NULL;
-str nsq_json_escape_str = str_init("%");
-char nsq_json_escape_char = '%';
 
 int nsq_topic_channel_counter = 0;
-int dbn_consumer_workers = DBN_DEFAULT_NO_WORKERS;
-int startup_time = 0;
-
-/* database connection */
-db1_con_t *nsq_pa_db = NULL;
-db_func_t nsq_pa_dbf;
-str nsq_presentity_table = str_init(PRESENTITY_TABLE);
-str nsq_db_url = {NULL, 0};
-int db_table_lock_type = 1;
-db_locking_t db_table_lock = DB_LOCKING_WRITE;
+int nsq_consumer_workers = DBN_DEFAULT_NO_WORKERS;
 
 static int mod_init(void);
 static int mod_child_init(int);

+ 0 - 537
src/modules/nsq/nsq_pua.c

@@ -1,537 +0,0 @@
-/*
- * NSQ module interface
- *
- * Copyright (C) 2010-2014 2600Hz
- *
- * This file is part of Kamailio, a free SIP server.
- *
- * Kamailio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version
- *
- * Kamailio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
- *
- * Contributor(s):
- * Emmanuel Schmidbauer <[email protected]>
- *
- */
-
-#include "../presence/bind_presence.h"
-#include "defs.h"
-#include "nsq_json.h"
-#include "nsq_pua.h"
-
-extern db1_con_t *nsq_pa_db;
-extern db_func_t nsq_pa_dbf;
-extern db_locking_t db_table_lock;
-extern str nsq_presentity_table;
-extern str nsq_db_url;
-
-extern int dbn_include_entity;
-extern int dbn_pua_mode;
-
-str str_event_message_summary = str_init("message-summary");
-str str_event_dialog = str_init("dialog");
-str str_event_presence = str_init("presence");
-
-str str_username_col = str_init("username");
-str str_domain_col = str_init("domain");
-str str_body_col = str_init("body");
-str str_expires_col = str_init("expires");
-str str_received_time_col = str_init("received_time");
-str str_presentity_uri_col = str_init("presentity_uri");
-str str_priority_col = str_init("priority");
-
-str str_event_col = str_init("event");
-str str_contact_col = str_init("contact");
-str str_callid_col = str_init("callid");
-str str_from_tag_col = str_init("from_tag");
-str str_to_tag_col = str_init("to_tag");
-str str_etag_col = str_init("etag");
-str str_sender_col = str_init("sender");
-
-str str_presence_note_busy = str_init("Busy");
-str str_presence_note_otp = str_init("On the Phone");
-str str_presence_note_idle = str_init("Idle");
-str str_presence_note_offline = str_init("Offline");
-str str_presence_act_busy = str_init("<rpid:busy/>");
-str str_presence_act_otp = str_init("<rpid:on-the-phone/>");
-str str_presence_status_offline = str_init("closed");
-str str_presence_status_online = str_init("open");
-
-str str_null_string = str_init("NULL");
-
-int nsq_pua_update_presentity(str* event, str* realm, str* user, str* etag, str* sender, str* body, int expires, int reset)
-{
-	db_key_t query_cols[13];
-	db_op_t  query_ops[13];
-	db_val_t query_vals[13];
-	int n_query_cols = 0;
-	int ret = -1;
-	int use_replace = 1;
-
-	query_cols[n_query_cols] = &str_event_col;
-	query_ops[n_query_cols] = OP_EQ;
-	query_vals[n_query_cols].type = DB1_STR;
-	query_vals[n_query_cols].nul = 0;
-	query_vals[n_query_cols].val.str_val = *event;
-	n_query_cols++;
-
-	query_cols[n_query_cols] = &str_domain_col;
-	query_ops[n_query_cols] = OP_EQ;
-	query_vals[n_query_cols].type = DB1_STR;
-	query_vals[n_query_cols].nul = 0;
-	query_vals[n_query_cols].val.str_val = *realm;
-	n_query_cols++;
-
-	query_cols[n_query_cols] = &str_username_col;
-	query_ops[n_query_cols] = OP_EQ;
-	query_vals[n_query_cols].type = DB1_STR;
-	query_vals[n_query_cols].nul = 0;
-	query_vals[n_query_cols].val.str_val = *user;
-	n_query_cols++;
-
-	query_cols[n_query_cols] = &str_etag_col;
-	query_ops[n_query_cols] = OP_EQ;
-	query_vals[n_query_cols].type = DB1_STR;
-	query_vals[n_query_cols].nul = 0;
-	query_vals[n_query_cols].val.str_val = *etag;
-	n_query_cols++;
-
-	query_cols[n_query_cols] = &str_sender_col;
-	query_vals[n_query_cols].type = DB1_STR;
-	query_vals[n_query_cols].nul = 0;
-	query_vals[n_query_cols].val.str_val = *sender;
-	n_query_cols++;
-
-	query_cols[n_query_cols] = &str_body_col;
-	query_vals[n_query_cols].type = DB1_BLOB;
-	query_vals[n_query_cols].nul = 0;
-	query_vals[n_query_cols].val.str_val = *body;
-	n_query_cols++;
-
-	query_cols[n_query_cols] = &str_received_time_col;
-	query_vals[n_query_cols].type = DB1_INT;
-	query_vals[n_query_cols].nul = 0;
-	query_vals[n_query_cols].val.int_val = (int)time(NULL);
-	n_query_cols++;
-
-	query_cols[n_query_cols] = &str_expires_col;
-	query_vals[n_query_cols].type = DB1_INT;
-	query_vals[n_query_cols].nul = 0;
-	query_vals[n_query_cols].val.int_val = expires;
-	n_query_cols++;
-
-	query_cols[n_query_cols] = &str_priority_col;
-	query_vals[n_query_cols].type = DB1_INT;
-	query_vals[n_query_cols].nul = 0;
-	query_vals[n_query_cols].val.int_val = 0;
-	n_query_cols++;
-
-	if (nsq_pa_dbf.use_table(nsq_pa_db, &nsq_presentity_table) < 0) {
-		LM_ERR("unsuccessful use_table [%.*s]\n", nsq_presentity_table.len, nsq_presentity_table.s);
-		goto error;
-	}
-
-	if (nsq_pa_dbf.replace == NULL || reset > 0) {
-		use_replace = 0;
-		LM_DBG("using delete/insert instead of replace\n");
-	}
-
-	if (nsq_pa_dbf.start_transaction) {
-		if (nsq_pa_dbf.start_transaction(nsq_pa_db, db_table_lock) < 0) {
-			LM_ERR("in start_transaction\n");
-			goto error;
-		}
-	}
-
-	if (use_replace) {
-		if (nsq_pa_dbf.replace(nsq_pa_db, query_cols, query_vals, n_query_cols, 4, 0) < 0) {
-			LM_ERR("replacing record in database\n");
-			if (nsq_pa_dbf.abort_transaction) {
-				if (nsq_pa_dbf.abort_transaction(nsq_pa_db) < 0) {
-					LM_ERR("in abort_transaction\n");
-				}
-			}
-			goto error;
-		}
-	} else {
-		if (nsq_pa_dbf.delete(nsq_pa_db, query_cols, query_ops, query_vals, 4-reset) < 0) {
-			LM_ERR("deleting record in database\n");
-			if (nsq_pa_dbf.abort_transaction) {
-				if (nsq_pa_dbf.abort_transaction(nsq_pa_db) < 0)
-					LM_ERR("in abort_transaction\n");
-			}
-			goto error;
-		}
-		if (nsq_pa_dbf.insert(nsq_pa_db, query_cols, query_vals, n_query_cols) < 0) {
-			LM_ERR("replacing record in database\n");
-			if (nsq_pa_dbf.abort_transaction) {
-				if (nsq_pa_dbf.abort_transaction(nsq_pa_db) < 0) {
-					LM_ERR("in abort_transaction\n");
-				}
-			}
-			goto error;
-		}
-	}
-
-	if (nsq_pa_dbf.end_transaction) {
-		if (nsq_pa_dbf.end_transaction(nsq_pa_db) < 0) {
-			LM_ERR("in end_transaction\n");
-			goto error;
-		}
-	}
-
-error:
-
-	return ret;
-}
-
-int nsq_pua_publish_presence_to_presentity(struct json_object *json_obj) {
-	int ret = 1;
-	str from = { 0, 0 }, to = { 0, 0 };
-	str from_user = { 0, 0 }, to_user = { 0, 0 };
-	str from_realm = { 0, 0 }, to_realm = { 0, 0 };
-	str callid = { 0, 0 }, fromtag = { 0, 0 }, totag = { 0, 0 };
-	str state = { 0, 0 };
-	str direction = { 0, 0 };
-	str event = str_init("presence");
-	str presence_body = { 0, 0 };
-	str activity = str_init("");
-	str note = str_init("Available");
-	str status = str_presence_status_online;
-	int expires = 0;
-
-	char *body = (char *)pkg_malloc(PRESENCE_BODY_BUFFER_SIZE);
-	if (body == NULL) {
-		LM_ERR("Error allocating buffer for publish\n");
-		ret = -1;
-		goto error;
-	}
-
-	json_extract_field(BLF_JSON_FROM, from);
-	json_extract_field(BLF_JSON_FROM_USER, from_user);
-	json_extract_field(BLF_JSON_FROM_REALM, from_realm);
-	json_extract_field(BLF_JSON_TO, to);
-	json_extract_field(BLF_JSON_TO_USER, to_user);
-	json_extract_field(BLF_JSON_TO_REALM, to_realm);
-	json_extract_field(BLF_JSON_CALLID, callid);
-	json_extract_field(BLF_JSON_FROMTAG, fromtag);
-	json_extract_field(BLF_JSON_TOTAG, totag);
-	json_extract_field(BLF_JSON_DIRECTION, direction);
-	json_extract_field(BLF_JSON_STATE, state);
-
-	struct json_object *ExpiresObj =  nsq_json_get_object(json_obj, BLF_JSON_EXPIRES);
-	if (ExpiresObj != NULL) {
-		expires = json_object_get_int(ExpiresObj);
-		if (expires > 0)
-			expires += (int)time(NULL);
-	}
-
-	if (!from_user.len || !to_user.len || !state.len) {
-		LM_ERR("missing one of From / To / State\n");
-		goto error;
-	}
-
-	if (!strcmp(state.s, "early")) {
-		note = str_presence_note_busy;
-		activity = str_presence_act_busy;
-
-	} else if (!strcmp(state.s, "confirmed")) {
-		note = str_presence_note_otp;
-		activity = str_presence_act_otp;
-
-	} else if (!strcmp(state.s, "offline")) {
-		note = str_presence_note_offline;
-		status = str_presence_status_offline;
-
-	}; // else {
-	//	note = str_presence_note_idle;
-	//}
-
-
-	sprintf(body, PRESENCE_BODY, from_user.s, callid.s, status.s, note.s, activity.s, note.s);
-
-	presence_body.s = body;
-	presence_body.len = strlen(body);
-
-	if (dbn_pua_mode == 1) {
-		nsq_pua_update_presentity(&event, &from_realm, &from_user, &callid, &from, &presence_body, expires, 1);
-	}
-
- error:
-
- if (body)
-	  pkg_free(body);
-
- return ret;
-
-}
-
-int nsq_pua_publish_mwi_to_presentity(struct json_object *json_obj) {
-	int ret = 1;
-	str event = str_init("message-summary");
-	str from = { 0, 0 }, to = { 0, 0 };
-	str from_user = { 0, 0 }, to_user = { 0, 0 };
-	str from_realm = { 0, 0 }, to_realm = { 0, 0 };
-	str callid = { 0, 0 }, fromtag = { 0, 0 }, totag = { 0, 0 };
-	str mwi_user = { 0, 0 }, mwi_waiting = { 0, 0 },
-		mwi_voice_message = { 0, 0 }, mwi_new = { 0, 0 }, mwi_saved = { 0, 0 },
-		mwi_urgent = { 0, 0 }, mwi_urgent_saved = { 0, 0 },
-		mwi_account = { 0, 0 }, mwi_body = { 0, 0 };
-	int expires = 0;
-
-	char *body = (char *)pkg_malloc(MWI_BODY_BUFFER_SIZE);
-	if (body == NULL) {
-		LM_ERR("Error allocating buffer for publish\n");
-		ret = -1;
-		goto error;
-	}
-
-	json_extract_field(BLF_JSON_FROM, from);
-	json_extract_field(BLF_JSON_FROM_USER, from_user);
-	json_extract_field(BLF_JSON_FROM_REALM, from_realm);
-	json_extract_field(BLF_JSON_TO, to);
-	json_extract_field(BLF_JSON_TO_USER, to_user);
-	json_extract_field(BLF_JSON_TO_REALM, to_realm);
-	json_extract_field(BLF_JSON_CALLID, callid);
-	json_extract_field(BLF_JSON_FROMTAG, fromtag);
-	json_extract_field(BLF_JSON_TOTAG, totag);
-
-	json_extract_field(MWI_JSON_TO, mwi_user);
-	json_extract_field(MWI_JSON_WAITING, mwi_waiting);
-	json_extract_field(MWI_JSON_VOICE_MESSAGE, mwi_voice_message);
-	json_extract_field(MWI_JSON_NEW, mwi_new);
-	json_extract_field(MWI_JSON_SAVED, mwi_saved);
-	json_extract_field(MWI_JSON_URGENT, mwi_urgent);
-	json_extract_field(MWI_JSON_URGENT_SAVED, mwi_urgent_saved);
-	json_extract_field(MWI_JSON_ACCOUNT, mwi_account);
-
-	struct json_object *ExpiresObj =  nsq_json_get_object(json_obj, BLF_JSON_EXPIRES);
-	if (ExpiresObj != NULL) {
-		expires = json_object_get_int(ExpiresObj);
-		if (expires > 0)
-			expires += (int)time(NULL);
-	}
-
-	if (mwi_new.len > 0) {
-		sprintf(body, MWI_BODY, mwi_waiting.len, mwi_waiting.s,
-		    mwi_account.len, mwi_account.s, mwi_new.len, mwi_new.s,
-		    mwi_saved.len, mwi_saved.s, mwi_urgent.len, mwi_urgent.s,
-		    mwi_urgent_saved.len, mwi_urgent_saved.s);
-	} else if (mwi_voice_message.len > 0) {
-		sprintf(body, MWI_BODY_VOICE_MESSAGE, mwi_waiting.len, mwi_waiting.s,
-		    mwi_account.len, mwi_account.s, mwi_voice_message.len, mwi_voice_message.s);
-	} else {
-		sprintf(body, MWI_BODY_NO_VOICE_MESSAGE, mwi_waiting.len, mwi_waiting.s,
-		    mwi_account.len, mwi_account.s);
-	}
-
-	mwi_body.s = body;
-	mwi_body.len = strlen(body);
-
-	if (dbn_pua_mode == 1) {
-		nsq_pua_update_presentity(&event, &from_realm, &from_user, &callid, &from, &mwi_body, expires, 1);
-	}
-
- error:
-
-   if (body)
-	  pkg_free(body);
-
-
-   return ret;
-}
-
-
-int nsq_pua_publish_dialoginfo_to_presentity(struct json_object *json_obj) {
-	int ret = 1;
-	str from = { 0, 0 }, to = { 0, 0 }, pres = {0, 0};
-	str from_user = { 0, 0 }, to_user = { 0, 0 }, pres_user = { 0, 0 };
-	str from_realm = { 0, 0 }, to_realm = { 0, 0 }, pres_realm = { 0, 0 };
-	str from_uri = { 0, 0 }, to_uri = { 0, 0 };
-	str callid = { 0, 0 }, fromtag = { 0, 0 }, totag = { 0, 0 };
-	str state = { 0, 0 };
-	str direction = { 0, 0 };
-	char sender_buf[1024];
-	str sender = {0, 0};
-	str dialoginfo_body = {0 , 0};
-	int expires = 0;
-	str event = str_init("dialog");
-	int reset = 0;
-	char to_tag_buffer[100];
-	char from_tag_buffer[100];
-
-	char *body = (char *)pkg_malloc(DIALOGINFO_BODY_BUFFER_SIZE);
-	if (body == NULL) {
-		LM_ERR("Error allocating buffer for publish\n");
-		ret = -1;
-		goto error;
-	}
-
-
-	json_extract_field(BLF_JSON_PRES, pres);
-	json_extract_field(BLF_JSON_PRES_USER, pres_user);
-	json_extract_field(BLF_JSON_PRES_REALM, pres_realm);
-	json_extract_field(BLF_JSON_FROM, from);
-	json_extract_field(BLF_JSON_FROM_USER, from_user);
-	json_extract_field(BLF_JSON_FROM_REALM, from_realm);
-	json_extract_field(BLF_JSON_FROM_URI, from_uri);
-	json_extract_field(BLF_JSON_TO, to);
-	json_extract_field(BLF_JSON_TO_USER, to_user);
-	json_extract_field(BLF_JSON_TO_REALM, to_realm);
-	json_extract_field(BLF_JSON_TO_URI, to_uri);
-	json_extract_field(BLF_JSON_CALLID, callid);
-	json_extract_field(BLF_JSON_FROMTAG, fromtag);
-	json_extract_field(BLF_JSON_TOTAG, totag);
-	json_extract_field(BLF_JSON_DIRECTION, direction);
-	json_extract_field(BLF_JSON_STATE, state);
-
-	struct json_object *ExpiresObj =  nsq_json_get_object(json_obj, BLF_JSON_EXPIRES);
-	if (ExpiresObj != NULL) {
-		expires = json_object_get_int(ExpiresObj);
-		if (expires > 0)
-			expires += (int)time(NULL);
-	}
-
-	ExpiresObj =  nsq_json_get_object(json_obj, "Flush-Level");
-	if (ExpiresObj != NULL) {
-		reset = json_object_get_int(ExpiresObj);
-	}
-
-	if (!from.len || !to.len || !state.len) {
-		LM_ERR("missing one of From / To / State\n");
-		goto error;
-	}
-
-	if (!pres.len || !pres_user.len || !pres_realm.len) {
-		pres = from;
-		pres_user = from_user;
-		pres_realm = from_realm;
-	}
-
-	if (!from_uri.len)
-		from_uri = from;
-
-	if (!to_uri.len)
-		to_uri = to;
-
-	if (fromtag.len > 0) {
-		fromtag.len = sprintf(from_tag_buffer, LOCAL_TAG, fromtag.len, fromtag.s);
-		fromtag.s = from_tag_buffer;
-	}
-
-	if (totag.len > 0) {
-		totag.len = sprintf(to_tag_buffer, REMOTE_TAG, totag.len, totag.s);
-		totag.s = to_tag_buffer;
-	}
-
-	if (callid.len) {
-
-		if (dbn_include_entity) {
-		sprintf(body, DIALOGINFO_BODY,
-				pres.len, pres.s,
-				callid.len, callid.s,
-				callid.len, callid.s,
-				fromtag.len, fromtag.s,
-				totag.len, totag.s,
-				direction.len, direction.s,
-				state.len, state.s,
-				from_user.len, from_user.s,
-				from.len, from.s,
-				from_uri.len, from_uri.s,
-				to_user.len, to_user.s,
-				to.len, to.s,
-				to_uri.len, to_uri.s
-				);
-		} else {
-
-		sprintf(body, DIALOGINFO_BODY_2,
-				pres.len, pres.s,
-				callid.len, callid.s,
-				callid.len, callid.s,
-				fromtag.len, fromtag.s,
-				totag.len, totag.s,
-				direction.len, direction.s,
-				state.len, state.s,
-				from_user.len, from_user.s,
-				from.len, from.s,
-				to_user.len, to_user.s,
-				to.len, to.s
-				);
-		}
-
-	} else {
-		sprintf(body, DIALOGINFO_EMPTY_BODY, pres.len, pres.s);
-	}
-
-
-	sprintf(sender_buf, "sip:%s", callid.s);
-	sender.s = sender_buf;
-	sender.len = strlen(sender_buf);
-
-	dialoginfo_body.s = body;
-	dialoginfo_body.len = strlen(body);
-
-	if (dbn_pua_mode == 1) {
-		nsq_pua_update_presentity(&event, &pres_realm, &pres_user, &callid, &sender, &dialoginfo_body, expires, reset);
-	}
-
- error:
-
-   if (body)
-	  pkg_free(body);
-
-
- return ret;
-}
-
-
-int nsq_pua_publish(struct sip_msg* msg, char *json) {
-	str event_name = { 0, 0 }, event_package = { 0, 0 };
-	struct json_object *json_obj = NULL;
-	int ret = 1;
-
-	if (dbn_pua_mode != 1) {
-		LM_ERR("pua_mode must be 1 to publish\n");
-		ret = -1;
-		goto error;
-	}
-
-	/* extract info from json and construct xml */
-	json_obj = nsq_json_parse(json);
-	if (json_obj == NULL) {
-		ret = -1;
-		goto error;
-	}
-
-	json_extract_field(BLF_JSON_EVENT_NAME, event_name);
-
-	if (event_name.len == 6 && strncmp(event_name.s, "update", 6) == 0) {
-		json_extract_field(BLF_JSON_EVENT_PKG, event_package);
-		if (event_package.len == str_event_dialog.len
-				&& strncmp(event_package.s, str_event_dialog.s, event_package.len) == 0) {
-			ret = nsq_pua_publish_dialoginfo_to_presentity(json_obj);
-		} else if (event_package.len == str_event_message_summary.len
-				&& strncmp(event_package.s, str_event_message_summary.s, event_package.len) == 0) {
-			ret = nsq_pua_publish_mwi_to_presentity(json_obj);
-		} else if (event_package.len == str_event_presence.len
-				&& strncmp(event_package.s, str_event_presence.s, event_package.len) == 0) {
-			ret = nsq_pua_publish_presence_to_presentity(json_obj);
-		}
-	}
-
-error:
-	if (json_obj)
-		json_object_put(json_obj);
-
-	return ret;
-}

+ 0 - 34
src/modules/nsq/nsq_pua.h

@@ -1,34 +0,0 @@
-/*
- * NSQ module interface
- *
- * Copyright (C) 2010-2014 2600Hz
- *
- * This file is part of Kamailio, a free SIP server.
- *
- * Kamailio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version
- *
- * Kamailio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
- *
- * Contributor(s):
- * Emmanuel Schmidbauer <[email protected]>
- *
- */
-
-#ifndef __NSQ_PUA_H_
-#define __NSQ_PUA_H_
-
-
-int nsq_pua_publish(struct sip_msg* msg, char *json);
-
-
-#endif

+ 11 - 7
src/modules/nsq/nsq_reader.c

@@ -24,11 +24,14 @@
  *
  */
 
+#include "../json/api.h"
 #include "nsq_reader.h"
 
 char *eventData = NULL;
 
-typedef struct json_object *json_obj_ptr;
+extern json_api_t json_api;
+extern str nsq_event_key;
+extern str nsq_event_sub_key;
 
 int nsq_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param, pv_value_t *res)
 {
@@ -63,7 +66,7 @@ int nsq_consumer_fire_event(char *routename)
 
 int nsq_consumer_event(char *payload, char *channel, char *topic)
 {
-	json_obj_ptr json_obj = NULL;
+	struct json_object *json_obj = NULL;
 	int ret = 0;
 	str ev_name = {0, 0}, ev_category = {0, 0};
 	char *k = NULL;
@@ -72,21 +75,21 @@ int nsq_consumer_event(char *payload, char *channel, char *topic)
 
 	eventData = payload;
 
-	json_obj = nsq_json_parse(payload);
+	json_obj = json_api.json_parse(payload);
 	if (json_obj == NULL) {
-		return 0;
+		return ret;
 	}
 
 	k = pkg_malloc(nsq_event_key.len+1);
 	memcpy(k, nsq_event_key.s, nsq_event_key.len);
 	k[nsq_event_key.len] = '\0';
-	json_extract_field(k, ev_category);
+	json_api.extract_field(json_obj, k, &ev_category);
 	pkg_free(k);
 
 	k = pkg_malloc(nsq_event_sub_key.len+1);
 	memcpy(k, nsq_event_sub_key.s, nsq_event_sub_key.len);
 	k[nsq_event_sub_key.len] = '\0';
-	json_extract_field(k, ev_name);
+	json_api.extract_field(json_obj, k, &ev_name);
 	pkg_free(k);
 
 	sprintf(buffer, "nsq:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s);
@@ -114,8 +117,9 @@ int nsq_consumer_event(char *payload, char *channel, char *topic)
 		}
 	}
 
-	if(json_obj)
+	if (json_obj) {
 		json_object_put(json_obj);
+	}
 
 	eventData = NULL;
 

+ 0 - 3
src/modules/nsq/nsq_reader.h

@@ -29,11 +29,8 @@
 
 #include <json.h>
 
-#include "../../core/sr_module.h"
 #include "../../core/fmsg.h"
 #include "nsq.h"
-#include "nsq_json.h"
-
 
 int nsq_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param, pv_value_t *res);
 int nsq_consumer_fire_event(char *routename);

+ 0 - 452
src/modules/nsq/nsq_trans.c

@@ -1,452 +0,0 @@
-/*
- * NSQ module interface
- *
- * Copyright (C) 2010-2014 2600Hz
- *
- * This file is part of Kamailio, a free SIP server.
- *
- * Kamailio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version
- *
- * Kamailio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
- *
- * Contributor(s):
- * Emmanuel Schmidbauer <[email protected]>
- *
- */
-
-#include "../../core/trim.h"
-#include "../../core/mod_fix.h"
-
-#include "nsq_trans.h"
-#include "nsq_json.h"
-
-
-/*! transformation buffer size */
-#define NSQ_TR_BUFFER_SIZE 65536
-#define NSQ_TR_BUFFER_SLOTS	4
-
-/*! transformation buffer */
-static char **_nsq_tr_buffer_list = NULL;
-
-static char *_nsq_tr_buffer = NULL;
-
-static int _nsq_tr_buffer_idx = 0;
-
-#define NSQ_TR_ALLOC_PARSE_SIZE	2048
-
-static pv_spec_t**  _nsq_parse_specs  = NULL;
-static tr_param_t** _nsq_parse_params = NULL;
-static int _nsq_tr_parse_spec = 0;
-static int _nsq_tr_parse_params = 0;
-
-
-/*!
- *
- */
-int nsq_tr_init_buffers(void)
-{
-	int i;
-
-	_nsq_tr_buffer_list = (char**)malloc(NSQ_TR_BUFFER_SLOTS * sizeof(char*));
-
-	if (_nsq_tr_buffer_list==NULL)
-		return -1;
-	for (i=0; i<NSQ_TR_BUFFER_SLOTS; i++) {
-		_nsq_tr_buffer_list[i] = (char*)malloc(NSQ_TR_BUFFER_SIZE);
-		if(_nsq_tr_buffer_list[i]==NULL)
-			return -1;
-	}
-
-	_nsq_parse_specs = (pv_spec_t**)malloc(NSQ_TR_ALLOC_PARSE_SIZE * sizeof(pv_spec_t*));
-	for (i=0; i < NSQ_TR_ALLOC_PARSE_SIZE; i++)
-		_nsq_parse_specs[i] = NULL;
-
-	_nsq_parse_params = (tr_param_t**)malloc(NSQ_TR_ALLOC_PARSE_SIZE * sizeof(tr_param_t*));
-	for (i=0; i < NSQ_TR_ALLOC_PARSE_SIZE; i++)
-		_nsq_parse_params[i] = NULL;
-
-	return 0;
-}
-
-void nsq_tr_clear_buffers(void)
-{
-	int i;
-	if (_nsq_tr_buffer_list != NULL) {
-		for (i=0; i<NSQ_TR_BUFFER_SLOTS; i++) {
-			if (_nsq_tr_buffer_list[i] != NULL) {
-				free(_nsq_tr_buffer_list[i]);
-				_nsq_tr_buffer_list[i] = NULL;
-			}
-		}
-		free(_nsq_tr_buffer_list);
-		_nsq_tr_buffer_list = NULL;
-	}
-
-	if (_nsq_parse_specs != NULL) {
-		for (i=0; i<NSQ_TR_ALLOC_PARSE_SIZE; i++) {
-			if (_nsq_parse_specs[i] != NULL) {
-				free(_nsq_parse_specs[i]);
-				_nsq_parse_specs[i] = NULL;
-			}
-		}
-		free(_nsq_parse_specs);
-		_nsq_parse_specs = NULL;
-	}
-
-	if (_nsq_parse_params != NULL) {
-		for (i=0; i<NSQ_TR_ALLOC_PARSE_SIZE; i++) {
-			if (_nsq_parse_params[i] != NULL) {
-				free(_nsq_parse_params[i]);
-				_nsq_parse_params[i] = NULL;
-			}
-		}
-		free(_nsq_parse_params);
-		_nsq_parse_params = NULL;
-	}
-
-}
-
-char *nsq_tr_set_crt_buffer(void)
-{
-	_nsq_tr_buffer = _nsq_tr_buffer_list[_nsq_tr_buffer_idx];
-	_nsq_tr_buffer_idx = (_nsq_tr_buffer_idx + 1) % NSQ_TR_BUFFER_SLOTS;
-	return _nsq_tr_buffer;
-}
-
-#define nsq_tr_string_clone_result do { \
-		if(val->rs.len> NSQ_TR_BUFFER_SIZE-1) { \
-			LM_ERR("result is too big\n"); \
-			return -1; \
-		} \
-		strncpy(_nsq_tr_buffer, val->rs.s, val->rs.len); \
-		val->rs.s = _nsq_tr_buffer; \
-	} while(0);
-
-void nsq_destroy_pv_value(pv_value_t *val)
-{
-	if (val->flags & PV_VAL_PKG)
-		pkg_free(val->rs.s);
-	else if (val->flags & PV_VAL_SHM)
-		shm_free(val->rs.s);
-	pkg_free(val);
-}
-
-void nsq_free_pv_value(pv_value_t *val )
-{
-	if (val->flags & PV_VAL_PKG)
-		pkg_free(val->rs.s);
-	else if (val->flags & PV_VAL_SHM)
-		shm_free(val->rs.s);
-}
-
-pv_value_t* nsq_alloc_pv_value() {
-	pv_value_t *v = (pv_value_t*) pkg_malloc(sizeof(pv_value_t));
-	if (v != NULL)
-		memset(v, 0, sizeof(pv_value_t));
-	return v;
-}
-
-#define KEY_SAFE(C)  ((C >= 'a' && C <= 'z') || \
-					  (C >= 'A' && C <= 'Z') || \
-					  (C >= '0' && C <= '9') || \
-					  (C == '-' || C == '~'  || C == '_'))
-
-#define HI4(C) (C>>4)
-#define LO4(C) (C & 0x0F)
-
-#define hexint(C) (C < 10?('0' + C):('A'+ C - 10))
-
-char *nsq_util_encode(const str * key, char *dest) {
-	if ((key->len == 1) && (key->s[0] == '#' || key->s[0] == '*')) {
-		*dest++ = key->s[0];
-		return dest;
-	}
-	char *p, *end;
-	for (p = key->s, end = key->s + key->len; p < end; p++) {
-		if (KEY_SAFE(*p)) {
-			*dest++ = *p;
-		} else if (*p == '.') {
-			memcpy(dest, "\%2E", 3);
-			dest += 3;
-		} else if (*p == ' ') {
-			*dest++ = '+';
-		} else {
-			*dest++ = '%';
-			sprintf(dest, "%c%c", hexint(HI4(*p)), hexint(LO4(*p)));
-			dest += 2;
-		}
-	}
-	*dest = '\0';
-	return dest;
-}
-
-int nsq_encode_ex(str *unencoded, pv_value_p dst_val)
-{
-	char routing_key_buff[256];
-	memset(routing_key_buff,0, sizeof(routing_key_buff));
-	nsq_util_encode(unencoded, routing_key_buff);
-
-	int len = strlen(routing_key_buff);
-	dst_val->rs.s = pkg_malloc(len + 1);
-	memcpy(dst_val->rs.s, routing_key_buff, len);
-	dst_val->rs.s[len] = '\0';
-	dst_val->rs.len = len;
-	dst_val->flags = PV_VAL_STR | PV_VAL_PKG;
-
-	return 1;
-
-}
-
-/*!
- * \brief Evaluate NSQ transformations
- * \param msg SIP message
- * \param tp transformation
- * \param subtype transformation type
- * \param val pseudo-variable
- * \return 0 on success, -1 on error
- */
-int nsq_tr_eval(struct sip_msg *msg, tr_param_t *tp, int subtype, pv_value_t *val)
-{
-
-	str sv;
-	pv_value_t *pv;
-	pv_value_t v;
-	str v2 = {0,0};
-	void* v1 = NULL;
-
-	if (val==NULL || (val->flags&PV_VAL_NULL))
-		return -1;
-
-
-	nsq_tr_set_crt_buffer();
-
-	switch (subtype) {
-		case TR_NSQ_ENCODE:
-			if (!(val->flags&PV_VAL_STR))
-				return -1;
-
-			pv = nsq_alloc_pv_value();
-			if (pv == NULL) {
-				LM_ERR("NSQ encode transform : no more private memory\n");
-				return -1;
-			}
-
-			if (nsq_encode_ex(&val->rs, pv ) != 1) {
-				LM_ERR("error encoding value\n");
-				nsq_destroy_pv_value(pv);
-				return -1;
-			}
-
-			strncpy(_nsq_tr_buffer, pv->rs.s, pv->rs.len);
-			_nsq_tr_buffer[pv->rs.len] = '\0';
-
-			val->flags = PV_VAL_STR;
-			val->ri = 0;
-			val->rs.s = _nsq_tr_buffer;
-			val->rs.len = pv->rs.len;
-
-			nsq_destroy_pv_value(pv);
-			nsq_free_pv_value(val);
-
-			break;
-		case TR_NSQ_JSON:
-			if (!(val->flags&PV_VAL_STR))
-				return -1;
-
-			if (tp == NULL) {
-				LM_ERR("NSQ json transform invalid parameter\n");
-				return -1;
-			}
-
-			pv = nsq_alloc_pv_value();
-			if (pv == NULL) {
-				LM_ERR("NSQ encode transform : no more private memory\n");
-				return -1;
-			}
-
-
-			if (tp->type == TR_PARAM_STRING) {
-				v1 = tp->v.s.s;
-				if (fixup_spve_null(&v1, 1) != 0) {
-					LM_ERR("cannot get spve_value from TR_PARAM_STRING : %.*s\n", tp->v.s.len, tp->v.s.s);
-					return -1;
-				}
-				if (fixup_get_svalue(msg, (gparam_p)v1, &v2) != 0) {
-					LM_ERR("cannot get value from TR_PARAM_STRING\n");
-					fixup_free_spve_null(&v1, 1);
-					return -1;
-				}
-				fixup_free_spve_null(&v1, 1);
-				sv = v2;
-			} else {
-				if (pv_get_spec_value(msg, (pv_spec_p)tp->v.data, &v)!=0
-						|| (!(v.flags&PV_VAL_STR)) || v.rs.len<=0) {
-					LM_ERR("value cannot get spec value in json transform\n");
-					nsq_destroy_pv_value(pv);
-					return -1;
-				}
-				sv = v.rs;
-			}
-
-
-			if (nsq_json_get_field_ex(&val->rs, &sv, pv ) != 1) {
-				LM_ERR("error getting json\n");
-				nsq_destroy_pv_value(pv);
-				return -1;
-			}
-
-			strncpy(_nsq_tr_buffer, pv->rs.s, pv->rs.len);
-			_nsq_tr_buffer[pv->rs.len] = '\0';
-
-			val->flags = PV_VAL_STR;
-			val->ri = 0;
-			val->rs.s = _nsq_tr_buffer;
-			val->rs.len = pv->rs.len;
-
-			nsq_destroy_pv_value(pv);
-			nsq_free_pv_value(val);
-
-			break;
-
-		default:
-			LM_ERR("unknown NSQ transformation subtype %d\n", subtype);
-			return -1;
-	}
-	return 0;
-}
-
-#define _nsq_tr_parse_sparam(_p, _p0, _tp, _spec, _ps, _in, _s) \
-	while(is_in_str(_p, _in) && (*_p==' ' || *_p=='\t' || *_p=='\n')) _p++; \
-	if(*_p==PV_MARKER) \
-	{ /* pseudo-variable */ \
-		_spec = (pv_spec_t*)malloc(sizeof(pv_spec_t)); \
-		if(_spec==NULL) \
-		{ \
-			LM_ERR("no more private memory!\n"); \
-			goto error; \
-		} \
-		_s.s = _p; _s.len = _in->s + _in->len - _p; \
-		_p0 = pv_parse_spec(&_s, _spec); \
-		if(_p0==NULL) \
-		{ \
-			LM_ERR("invalid spec in substr transformation: %.*s!\n", \
-				_in->len, _in->s); \
-			goto error; \
-		} \
-		_p = _p0; \
-		_tp = (tr_param_t*)malloc(sizeof(tr_param_t)); \
-		if(_tp==NULL) \
-		{ \
-			LM_ERR("no more private memory!\n"); \
-			goto error; \
-		} \
-		memset(_tp, 0, sizeof(tr_param_t)); \
-		_tp->type = TR_PARAM_SPEC; \
-		_tp->v.data = (void*)_spec; \
-		_nsq_parse_specs[_nsq_tr_parse_spec++] = _spec; \
-		_nsq_parse_params[_nsq_tr_parse_params++] = _tp; \
-	} else { /* string */ \
-		_ps = _p; \
-		while(is_in_str(_p, _in) && *_p!='\t' && *_p!='\n' \
-				&& *_p!=TR_PARAM_MARKER && *_p!=TR_RBRACKET) \
-				_p++; \
-		if(*_p=='\0') \
-		{ \
-			LM_ERR("invalid param in transformation: %.*s!!\n", \
-				_in->len, _in->s); \
-			goto error; \
-		} \
-		_tp = (tr_param_t*)malloc(sizeof(tr_param_t)); \
-		if(_tp==NULL) \
-		{ \
-			LM_ERR("no more private memory!\n"); \
-			goto error; \
-		} \
-		memset(_tp, 0, sizeof(tr_param_t)); \
-		_tp->type = TR_PARAM_STRING; \
-		_tp->v.s.len = _p - _ps; \
-		_tp->v.s.s = (char*)malloc((tp->v.s.len+1)*sizeof(char)); \
-		strncpy(_tp->v.s.s, _ps, tp->v.s.len); \
-		_tp->v.s.s[tp->v.s.len] = '\0'; \
-		_nsq_parse_params[_nsq_tr_parse_params++] = _tp; \
-	}
-
-
-/*!
- * \brief Helper fuction to parse a NSQ transformation
- * \param in parsed string
- * \param t transformation
- * \return pointer to the end of the transformation in the string - '}', null on error
- */
-char* nsq_tr_parse(str* in, trans_t *t)
-{
-	char *p;
-	char *p0;
-	char *ps;
-	str name;
-	str s;
-	pv_spec_t *spec = NULL;
-	tr_param_t *tp = NULL;
-
-	if(in==NULL || t==NULL)
-		return NULL;
-
-	p = in->s;
-	name.s = in->s;
-	t->type = TR_NSQ;
-	t->trf = nsq_tr_eval;
-
-	/* find next token */
-	while(is_in_str(p, in) && *p!=TR_PARAM_MARKER && *p!=TR_RBRACKET) p++;
-	if (*p=='\0') {
-		LM_ERR("invalid transformation: %.*s\n",
-				in->len, in->s);
-		goto error;
-	}
-	name.len = p - name.s;
-	trim(&name);
-
-	if (name.len==6 && strncasecmp(name.s, "encode", 6)==0) {
-		t->subtype = TR_NSQ_ENCODE;
-		goto done;
-	} else if (name.len==4 && strncasecmp(name.s, "json", 4)==0) {
-		t->subtype = TR_NSQ_JSON;
-		if (*p!=TR_PARAM_MARKER) {
-			LM_ERR("invalid json transformation: %.*s!\n", in->len, in->s);
-			goto error;
-		}
-		p++;
-		_nsq_tr_parse_sparam(p, p0, tp, spec, ps, in, s);
-		t->params = tp;
-		tp = 0;
-		while(*p && (*p==' ' || *p=='\t' || *p=='\n')) p++;
-		if (*p!=TR_RBRACKET) {
-			LM_ERR("invalid json transformation: %.*s!!\n",
-				in->len, in->s);
-			goto error;
-		}
-		goto done;
-	}
-
-	LM_ERR("unknown NSQ transformation: %.*s/%.*s/%d!\n", in->len, in->s,
-			name.len, name.s, name.len);
-error:
-	if(tp)
-		free(tp);
-	if(spec)
-		free(spec);
-	return NULL;
-done:
-	t->name = name;
-	return p;
-}

+ 0 - 42
src/modules/nsq/nsq_trans.h

@@ -1,42 +0,0 @@
-/*
- * NSQ module interface
- *
- * Copyright (C) 2010-2014 2600Hz
- *
- * This file is part of Kamailio, a free SIP server.
- *
- * Kamailio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version
- *
- * Kamailio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
- *
- * Contributor(s):
- * Emmanuel Schmidbauer <[email protected]>
- *
- */
-
-#ifndef _NSQ_TRANS_H_
-#define _NSQ_TRANS_H_
-
-#include "../../core/pvar.h"
-
-
-enum _nsq_tr_type { TR_NONE=0, TR_NSQ };
-enum _nsq_tr_subtype { TR_NSQ_NONE=0, TR_NSQ_ENCODE, TR_NSQ_JSON };
-
-char *nsq_tr_parse(str *in, trans_t *tr);
-
-int nsq_tr_init_buffers(void);
-void nsq_tr_clear_buffers(void);
-
-
-#endif