|
@@ -4,7 +4,7 @@
|
|
#include <amqp.h>
|
|
#include <amqp.h>
|
|
#include <amqp_framing.h>
|
|
#include <amqp_framing.h>
|
|
#include <amqp_tcp_socket.h>
|
|
#include <amqp_tcp_socket.h>
|
|
-#include <json/json.h>
|
|
|
|
|
|
+#include <json.h>
|
|
#include <uuid/uuid.h>
|
|
#include <uuid/uuid.h>
|
|
#include "../../mem/mem.h"
|
|
#include "../../mem/mem.h"
|
|
#include "../../timer_proc.h"
|
|
#include "../../timer_proc.h"
|
|
@@ -15,6 +15,7 @@
|
|
|
|
|
|
|
|
|
|
#include "kz_amqp.h"
|
|
#include "kz_amqp.h"
|
|
|
|
+#include "kz_json.h"
|
|
|
|
|
|
#define RET_AMQP_ERROR 2
|
|
#define RET_AMQP_ERROR 2
|
|
|
|
|
|
@@ -43,6 +44,10 @@ extern int dbk_consumer_ack_loop_count;
|
|
extern int dbk_single_consumer_on_reconnect;
|
|
extern int dbk_single_consumer_on_reconnect;
|
|
extern int dbk_consume_messages_on_reconnect;
|
|
extern int dbk_consume_messages_on_reconnect;
|
|
|
|
|
|
|
|
+const amqp_bytes_t kz_amqp_empty_bytes = { 0, NULL };
|
|
|
|
+const amqp_table_t kz_amqp_empty_table = { 0, NULL };
|
|
|
|
+
|
|
|
|
+
|
|
static char *kz_amqp_str_dup(str *src)
|
|
static char *kz_amqp_str_dup(str *src)
|
|
{
|
|
{
|
|
char *res;
|
|
char *res;
|
|
@@ -599,8 +604,6 @@ void kz_amqp_add_payload_common_properties(json_obj_ptr json_obj, char* server_i
|
|
sprintf(node_name, "kamailio@%.*s", dbk_node_hostname.len, dbk_node_hostname.s);
|
|
sprintf(node_name, "kamailio@%.*s", dbk_node_hostname.len, dbk_node_hostname.s);
|
|
json_object_object_add(json_obj, BLF_JSON_NODE,
|
|
json_object_object_add(json_obj, BLF_JSON_NODE,
|
|
json_object_new_string(node_name));
|
|
json_object_new_string(node_name));
|
|
-// json_object_object_add(json_obj, BLF_JSON_SERVERID,
|
|
|
|
-// json_object_new_string(server_id));
|
|
|
|
json_object_object_add(json_obj, BLF_JSON_MSG_ID,
|
|
json_object_object_add(json_obj, BLF_JSON_MSG_ID,
|
|
json_object_new_string_len(unique->s, unique->len));
|
|
json_object_new_string_len(unique->s, unique->len));
|
|
|
|
|
|
@@ -627,13 +630,9 @@ int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload)
|
|
|
|
|
|
|
|
|
|
/* parse json and add extra fields */
|
|
/* parse json and add extra fields */
|
|
- json_obj = json_tokener_parse(str_payload->s);
|
|
|
|
- if (is_error(json_obj))
|
|
|
|
- {
|
|
|
|
- LM_ERR("Error parsing json: %s\n",json_tokener_errors[-(unsigned long)json_obj]);
|
|
|
|
- LM_ERR("%s\n", str_payload->s);
|
|
|
|
- goto error;
|
|
|
|
- }
|
|
|
|
|
|
+ json_obj = kz_json_parse(str_payload->s);
|
|
|
|
+ if (json_obj == NULL)
|
|
|
|
+ goto error;
|
|
|
|
|
|
kz_amqp_add_payload_common_properties(json_obj, serverid, &unique_string);
|
|
kz_amqp_add_payload_common_properties(json_obj, serverid, &unique_string);
|
|
|
|
|
|
@@ -701,13 +700,9 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_
|
|
|
|
|
|
|
|
|
|
/* parse json and add extra fields */
|
|
/* parse json and add extra fields */
|
|
- json_obj = json_tokener_parse(str_payload->s);
|
|
|
|
- if (is_error(json_obj))
|
|
|
|
- {
|
|
|
|
- LM_ERR("Error parsing json: %s\n",json_tokener_errors[-(unsigned long)json_obj]);
|
|
|
|
- LM_ERR("%s\n", str_payload->s);
|
|
|
|
- goto error;
|
|
|
|
- }
|
|
|
|
|
|
+ json_obj = kz_json_parse(str_payload->s);
|
|
|
|
+ if (json_obj == NULL)
|
|
|
|
+ goto error;
|
|
|
|
|
|
kz_amqp_add_payload_common_properties(json_obj, serverid, &unique_string);
|
|
kz_amqp_add_payload_common_properties(json_obj, serverid, &unique_string);
|
|
|
|
|
|
@@ -743,13 +738,9 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_
|
|
lock_get(&cmd->lock);
|
|
lock_get(&cmd->lock);
|
|
switch(cmd->return_code) {
|
|
switch(cmd->return_code) {
|
|
case AMQP_RESPONSE_NORMAL:
|
|
case AMQP_RESPONSE_NORMAL:
|
|
- json_body = json_tokener_parse(cmd->return_payload);
|
|
|
|
- if (is_error(json_body))
|
|
|
|
- {
|
|
|
|
- LM_ERR("Error parsing body json: %s\n",json_tokener_errors[-(unsigned long)json_body]);
|
|
|
|
- LM_ERR("JSON : %s\n", cmd->return_payload);
|
|
|
|
- goto error;
|
|
|
|
- }
|
|
|
|
|
|
+ json_body = kz_json_parse(cmd->return_payload);
|
|
|
|
+ if (json_body == NULL)
|
|
|
|
+ goto error;
|
|
*json_ret = json_body;
|
|
*json_ret = json_body;
|
|
ret = 0;
|
|
ret = 0;
|
|
break;
|
|
break;
|
|
@@ -979,13 +970,10 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
|
|
- json_obj = json_tokener_parse(payload_s.s);
|
|
|
|
- if (is_error(json_obj))
|
|
|
|
- {
|
|
|
|
- LM_ERR("Error parsing json: %s\n",json_tokener_errors[-(unsigned long)json_obj]);
|
|
|
|
- LM_ERR("%s\n", payload_s.s);
|
|
|
|
- return -1;
|
|
|
|
- }
|
|
|
|
|
|
+ json_obj = kz_json_parse(payload_s.s);
|
|
|
|
+ if (json_obj == NULL)
|
|
|
|
+ return -1;
|
|
|
|
+
|
|
|
|
|
|
json_extract_field("exchange", exchange_s);
|
|
json_extract_field("exchange", exchange_s);
|
|
json_extract_field("type", exchange_type_s);
|
|
json_extract_field("type", exchange_type_s);
|
|
@@ -994,32 +982,32 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
|
|
json_extract_field("event_key", key_s);
|
|
json_extract_field("event_key", key_s);
|
|
json_extract_field("event_subkey", subkey_s);
|
|
json_extract_field("event_subkey", subkey_s);
|
|
|
|
|
|
- tmpObj = json_object_object_get(json_obj, "passive");
|
|
|
|
|
|
+ tmpObj = kz_json_get_object(json_obj, "passive");
|
|
if(tmpObj != NULL) {
|
|
if(tmpObj != NULL) {
|
|
passive = json_object_get_int(tmpObj);
|
|
passive = json_object_get_int(tmpObj);
|
|
}
|
|
}
|
|
|
|
|
|
- tmpObj = json_object_object_get(json_obj, "durable");
|
|
|
|
|
|
+ tmpObj = kz_json_get_object(json_obj, "durable");
|
|
if(tmpObj != NULL) {
|
|
if(tmpObj != NULL) {
|
|
durable = json_object_get_int(tmpObj);
|
|
durable = json_object_get_int(tmpObj);
|
|
}
|
|
}
|
|
|
|
|
|
- tmpObj = json_object_object_get(json_obj, "exclusive");
|
|
|
|
|
|
+ tmpObj = kz_json_get_object(json_obj, "exclusive");
|
|
if(tmpObj != NULL) {
|
|
if(tmpObj != NULL) {
|
|
exclusive = json_object_get_int(tmpObj);
|
|
exclusive = json_object_get_int(tmpObj);
|
|
}
|
|
}
|
|
|
|
|
|
- tmpObj = json_object_object_get(json_obj, "auto_delete");
|
|
|
|
|
|
+ tmpObj = kz_json_get_object(json_obj, "auto_delete");
|
|
if(tmpObj != NULL) {
|
|
if(tmpObj != NULL) {
|
|
auto_delete = json_object_get_int(tmpObj);
|
|
auto_delete = json_object_get_int(tmpObj);
|
|
}
|
|
}
|
|
|
|
|
|
- tmpObj = json_object_object_get(json_obj, "no_ack");
|
|
|
|
|
|
+ tmpObj = kz_json_get_object(json_obj, "no_ack");
|
|
if(tmpObj != NULL) {
|
|
if(tmpObj != NULL) {
|
|
no_ack = json_object_get_int(tmpObj);
|
|
no_ack = json_object_get_int(tmpObj);
|
|
}
|
|
}
|
|
|
|
|
|
- tmpObj = json_object_object_get(json_obj, "wait_for_consumer_ack");
|
|
|
|
|
|
+ tmpObj = kz_json_get_object(json_obj, "wait_for_consumer_ack");
|
|
if(tmpObj != NULL) {
|
|
if(tmpObj != NULL) {
|
|
wait_for_consumer_ack = json_object_get_int(tmpObj);
|
|
wait_for_consumer_ack = json_object_get_int(tmpObj);
|
|
}
|
|
}
|
|
@@ -1194,26 +1182,26 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int loopcount, int i
|
|
goto error;
|
|
goto error;
|
|
}
|
|
}
|
|
|
|
|
|
- r = amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, 0, 0, 1, 1, amqp_empty_table);
|
|
|
|
|
|
+ r = amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, 0, 0, 1, 1, kz_amqp_empty_table);
|
|
if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
|
|
if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
|
|
{
|
|
{
|
|
goto error;
|
|
goto error;
|
|
}
|
|
}
|
|
|
|
|
|
- amqp_exchange_declare(kz_conn->conn, channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, amqp_empty_table);
|
|
|
|
|
|
+ amqp_exchange_declare(kz_conn->conn, channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
|
|
if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
|
|
if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
|
|
{
|
|
{
|
|
ret = -RET_AMQP_ERROR;
|
|
ret = -RET_AMQP_ERROR;
|
|
goto error;
|
|
goto error;
|
|
}
|
|
}
|
|
|
|
|
|
- if (amqp_queue_bind(kz_conn->conn, channels[idx].channel, bind->queue, bind->exchange, bind->routing_key, amqp_empty_table) < 0
|
|
|
|
|
|
+ if (amqp_queue_bind(kz_conn->conn, channels[idx].channel, bind->queue, bind->exchange, bind->routing_key, kz_amqp_empty_table) < 0
|
|
|| kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn)))
|
|
|| kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn)))
|
|
{
|
|
{
|
|
goto error;
|
|
goto error;
|
|
}
|
|
}
|
|
|
|
|
|
- if (amqp_basic_consume(kz_conn->conn, channels[idx].channel, bind->queue, amqp_empty_bytes, 0, 1, 1, amqp_empty_table) < 0
|
|
|
|
|
|
+ if (amqp_basic_consume(kz_conn->conn, channels[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, 1, 1, kz_amqp_empty_table) < 0
|
|
|| kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn)))
|
|
|| kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn)))
|
|
{
|
|
{
|
|
goto error;
|
|
goto error;
|
|
@@ -1244,14 +1232,14 @@ int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind)
|
|
|
|
|
|
int idx = get_channel_index();
|
|
int idx = get_channel_index();
|
|
|
|
|
|
- amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, bind->passive, bind->durable, bind->exclusive, bind->auto_delete, amqp_empty_table);
|
|
|
|
|
|
+ amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, bind->passive, bind->durable, bind->exclusive, bind->auto_delete, kz_amqp_empty_table);
|
|
if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
|
|
if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
|
|
{
|
|
{
|
|
ret = -RET_AMQP_ERROR;
|
|
ret = -RET_AMQP_ERROR;
|
|
goto error;
|
|
goto error;
|
|
}
|
|
}
|
|
|
|
|
|
- amqp_exchange_declare(kz_conn->conn, channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, amqp_empty_table);
|
|
|
|
|
|
+ amqp_exchange_declare(kz_conn->conn, channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
|
|
if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
|
|
if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
|
|
{
|
|
{
|
|
ret = -RET_AMQP_ERROR;
|
|
ret = -RET_AMQP_ERROR;
|
|
@@ -1259,7 +1247,7 @@ int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind)
|
|
}
|
|
}
|
|
|
|
|
|
LM_DBG("QUEUE BIND\n");
|
|
LM_DBG("QUEUE BIND\n");
|
|
- if (amqp_queue_bind(kz_conn->conn, channels[idx].channel, bind->queue, bind->exchange, bind->routing_key, amqp_empty_table) < 0
|
|
|
|
|
|
+ if (amqp_queue_bind(kz_conn->conn, channels[idx].channel, bind->queue, bind->exchange, bind->routing_key, kz_amqp_empty_table) < 0
|
|
|| kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn)))
|
|
|| kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn)))
|
|
{
|
|
{
|
|
ret = -RET_AMQP_ERROR;
|
|
ret = -RET_AMQP_ERROR;
|
|
@@ -1267,7 +1255,7 @@ int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind)
|
|
}
|
|
}
|
|
|
|
|
|
LM_DBG("BASIC CONSUME\n");
|
|
LM_DBG("BASIC CONSUME\n");
|
|
- if (amqp_basic_consume(kz_conn->conn, channels[idx].channel, bind->queue, amqp_empty_bytes, 0, bind->no_ack, 0, amqp_empty_table) < 0
|
|
|
|
|
|
+ if (amqp_basic_consume(kz_conn->conn, channels[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, bind->no_ack, 0, kz_amqp_empty_table) < 0
|
|
|| kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn)))
|
|
|| kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn)))
|
|
{
|
|
{
|
|
ret = -RET_AMQP_ERROR;
|
|
ret = -RET_AMQP_ERROR;
|
|
@@ -1308,15 +1296,11 @@ int kz_amqp_send_ex(kz_amqp_conn_ptr kz_conn, kz_amqp_cmd_ptr cmd, kz_amqp_chann
|
|
routing_key = amqp_bytes_malloc_dup(amqp_cstring_bytes(cmd->routing_key));
|
|
routing_key = amqp_bytes_malloc_dup(amqp_cstring_bytes(cmd->routing_key));
|
|
payload = amqp_bytes_malloc_dup(amqp_cstring_bytes(cmd->payload));
|
|
payload = amqp_bytes_malloc_dup(amqp_cstring_bytes(cmd->payload));
|
|
|
|
|
|
- json_obj = json_tokener_parse(cmd->payload);
|
|
|
|
- if (is_error(json_obj))
|
|
|
|
- {
|
|
|
|
- LM_ERR("Error parsing json: %s\n",json_tokener_errors[-(unsigned long)json_obj]);
|
|
|
|
- LM_ERR("%s\n", cmd->payload);
|
|
|
|
- goto error;
|
|
|
|
- }
|
|
|
|
|
|
+ json_obj = kz_json_parse(cmd->payload);
|
|
|
|
+ if (json_obj == NULL)
|
|
|
|
+ goto error;
|
|
|
|
|
|
- if(json_object_object_get(json_obj, BLF_JSON_SERVERID) == NULL) {
|
|
|
|
|
|
+ if(kz_json_get_object(json_obj, BLF_JSON_SERVERID) == NULL) {
|
|
json_object_object_add(json_obj, BLF_JSON_SERVERID, json_object_new_string((char*)channels[idx].targeted->routing_key.bytes));
|
|
json_object_object_add(json_obj, BLF_JSON_SERVERID, json_object_new_string((char*)channels[idx].targeted->routing_key.bytes));
|
|
amqp_bytes_free(payload);
|
|
amqp_bytes_free(payload);
|
|
payload = amqp_bytes_malloc_dup(amqp_cstring_bytes((char*)json_object_to_json_string(json_obj)));
|
|
payload = amqp_bytes_malloc_dup(amqp_cstring_bytes((char*)json_object_to_json_string(json_obj)));
|
|
@@ -1412,13 +1396,9 @@ void kz_amqp_consumer_event(int child_no, char *payload, char* event_key, char*
|
|
|
|
|
|
eventData = payload;
|
|
eventData = payload;
|
|
|
|
|
|
- json_obj = json_tokener_parse(payload);
|
|
|
|
- if (is_error(json_obj))
|
|
|
|
- {
|
|
|
|
- LM_ERR("Error parsing json: %s\n",json_tokener_errors[-(unsigned long)json_obj]);
|
|
|
|
- LM_ERR("%s\n", payload);
|
|
|
|
|
|
+ json_obj = kz_json_parse(payload);
|
|
|
|
+ if (json_obj == NULL)
|
|
return;
|
|
return;
|
|
- }
|
|
|
|
|
|
|
|
char* key = (event_key == NULL ? dbk_consumer_event_key.s : event_key);
|
|
char* key = (event_key == NULL ? dbk_consumer_event_key.s : event_key);
|
|
char* subkey = (event_subkey == NULL ? dbk_consumer_event_subkey.s : event_subkey);
|
|
char* subkey = (event_subkey == NULL ? dbk_consumer_event_subkey.s : event_subkey);
|