|
@@ -5,7 +5,6 @@
|
|
|
#include <amqp_framing.h>
|
|
|
#include <amqp_tcp_socket.h>
|
|
|
#include <json.h>
|
|
|
-#include <uuid/uuid.h>
|
|
|
#include "../../mem/mem.h"
|
|
|
#include "../../timer_proc.h"
|
|
|
#include "../../sr_module.h"
|
|
@@ -47,6 +46,8 @@ extern int dbk_consume_messages_on_reconnect;
|
|
|
const amqp_bytes_t kz_amqp_empty_bytes = { 0, NULL };
|
|
|
const amqp_table_t kz_amqp_empty_table = { 0, NULL };
|
|
|
|
|
|
+char* last_payload_result = NULL;
|
|
|
+
|
|
|
|
|
|
static char *kz_amqp_str_dup(str *src)
|
|
|
{
|
|
@@ -332,6 +333,8 @@ void kz_amqp_destroy() {
|
|
|
shm_free(kz_pool);
|
|
|
}
|
|
|
|
|
|
+ if(last_payload_result != NULL)
|
|
|
+ free(last_payload_result);
|
|
|
|
|
|
}
|
|
|
|
|
@@ -618,6 +621,7 @@ int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload)
|
|
|
str unique_string = { 0, 0 };
|
|
|
char serverid[512];
|
|
|
|
|
|
+ /*
|
|
|
uuid_t id;
|
|
|
char uuid_buffer[40];
|
|
|
|
|
@@ -625,6 +629,8 @@ int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload)
|
|
|
uuid_unparse_lower(id, uuid_buffer);
|
|
|
unique_string.s = uuid_buffer;
|
|
|
unique_string.len = strlen(unique_string.s);
|
|
|
+ */
|
|
|
+ kz_generate_callid(&unique_string);
|
|
|
|
|
|
sprintf(serverid, "kamailio@%.*s-<%d>-script-%lu", dbk_node_hostname.len, dbk_node_hostname.s, my_pid(), rpl_query_routing_key_count++);
|
|
|
|
|
@@ -688,6 +694,7 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_
|
|
|
str unique_string = { 0, 0 };
|
|
|
char serverid[512];
|
|
|
|
|
|
+ /*
|
|
|
uuid_t id;
|
|
|
char uuid_buffer[40];
|
|
|
|
|
@@ -695,6 +702,9 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_
|
|
|
uuid_unparse_lower(id, uuid_buffer);
|
|
|
unique_string.s = uuid_buffer;
|
|
|
unique_string.len = strlen(unique_string.s);
|
|
|
+ */
|
|
|
+ kz_generate_callid(&unique_string);
|
|
|
+
|
|
|
|
|
|
sprintf(serverid, "kamailio@%.*s-<%d>-script-%lu", dbk_node_hostname.len, dbk_node_hostname.s, my_pid(), rpl_query_routing_key_count++);
|
|
|
|
|
@@ -796,9 +806,6 @@ int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char
|
|
|
|
|
|
};
|
|
|
|
|
|
-
|
|
|
-char* last_payload_result = NULL;
|
|
|
-
|
|
|
int kz_pv_get_last_query_result(struct sip_msg *msg, pv_param_t *param, pv_value_t *res)
|
|
|
{
|
|
|
return last_payload_result == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, last_payload_result);
|
|
@@ -812,7 +819,7 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha
|
|
|
str routing_key_s;
|
|
|
|
|
|
if(last_payload_result)
|
|
|
- pkg_free(last_payload_result);
|
|
|
+ free(last_payload_result);
|
|
|
|
|
|
last_payload_result = NULL;
|
|
|
|
|
@@ -849,7 +856,7 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha
|
|
|
|
|
|
char* strjson = (char*)json_object_to_json_string(ret);
|
|
|
int len = strlen(strjson);
|
|
|
- char* value = pkg_malloc(len+1);
|
|
|
+ char* value = malloc(len+1);
|
|
|
memcpy(value, strjson, len);
|
|
|
value[len] = '\0';
|
|
|
last_payload_result = value;
|
|
@@ -1154,7 +1161,7 @@ int get_channel_index() {
|
|
|
int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int loopcount, int idx )
|
|
|
{
|
|
|
kz_amqp_bind_ptr bind = NULL;
|
|
|
- amqp_queue_declare_ok_t *r = NULL;
|
|
|
+// amqp_queue_declare_ok_t *r = NULL;
|
|
|
str rpl_exch = str_init("targeted");
|
|
|
str rpl_exch_type = str_init("direct");
|
|
|
int ret = -1;
|
|
@@ -1182,13 +1189,13 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int loopcount, int i
|
|
|
goto error;
|
|
|
}
|
|
|
|
|
|
- r = amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, 0, 0, 1, 1, kz_amqp_empty_table);
|
|
|
+ amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, 0, 0, 1, 1, kz_amqp_empty_table);
|
|
|
if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
|
|
|
{
|
|
|
goto error;
|
|
|
}
|
|
|
|
|
|
- amqp_exchange_declare(kz_conn->conn, channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
|
|
|
+ amqp_exchange_declare(kz_conn->conn, channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
|
|
|
if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
|
|
|
{
|
|
|
ret = -RET_AMQP_ERROR;
|
|
@@ -1547,7 +1554,7 @@ void kz_amqp_manager_loop(int child_no)
|
|
|
int INTERNAL_READ_COUNT , INTERNAL_READ_MAX_LOOP;
|
|
|
int CONSUMER_READ_COUNT , CONSUMER_READ_MAX_LOOP;
|
|
|
int ACK_READ_COUNT , ACK_READ_MAX_LOOP;
|
|
|
- char* payload;
|
|
|
+// char* payload;
|
|
|
int channel_res;
|
|
|
kz_amqp_conn_ptr kzconn;
|
|
|
kz_amqp_cmd_ptr cmd;
|
|
@@ -1694,7 +1701,7 @@ void kz_amqp_manager_loop(int child_no)
|
|
|
|
|
|
CONSUMER_READ_COUNT = 0;
|
|
|
while(CONSUME && (CONSUMER_READ_COUNT < CONSUMER_READ_MAX_LOOP || firstLoop)) {
|
|
|
- payload = NULL;
|
|
|
+// payload = NULL;
|
|
|
CONSUMER_READ_COUNT++;
|
|
|
amqp_envelope_t envelope;
|
|
|
amqp_maybe_release_buffers(kzconn->conn);
|
|
@@ -1788,3 +1795,135 @@ void kz_amqp_manager_loop(int child_no)
|
|
|
kz_amqp_fire_connection_event("closed", kzconn->info.host);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * \brief Length of a Call-ID in TM
|
|
|
+ */
|
|
|
+#define CALLID_NR_LEN 20
|
|
|
+
|
|
|
+/**
|
|
|
+ * \brief Length of the Call-ID suffix
|
|
|
+ */
|
|
|
+#define CALLID_SUFFIX_LEN ( 1 /* - */ + \
|
|
|
+ 5 /* pid */ + \
|
|
|
+ 42 /* embedded v4inv6 address can be looong '128.' */ + \
|
|
|
+ 2 /* parenthesis [] */ + \
|
|
|
+ 1 /* ZT 0 */ + \
|
|
|
+ 16 /* one never knows ;-) */ \
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+static unsigned long callid_nr;
|
|
|
+static char callid_buf[CALLID_NR_LEN + CALLID_SUFFIX_LEN];
|
|
|
+
|
|
|
+static str callid_prefix;
|
|
|
+static str callid_suffix;
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * \brief Initialize the Call-ID generator, generates random prefix
|
|
|
+ * \return 0 on success, -1 on error
|
|
|
+ */
|
|
|
+int kz_callid_init(void)
|
|
|
+{
|
|
|
+ int rand_bits, i;
|
|
|
+
|
|
|
+ /* calculate the initial call-id */
|
|
|
+ /* how many bits and chars do we need to display the
|
|
|
+ * whole ULONG number */
|
|
|
+ callid_prefix.len = sizeof(unsigned long) * 2;
|
|
|
+ callid_prefix.s = callid_buf;
|
|
|
+
|
|
|
+ if (callid_prefix.len > CALLID_NR_LEN) {
|
|
|
+ LOG(L_ERR, "ERROR: Too small callid buffer\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ for(rand_bits = 1, i = RAND_MAX; i; i >>= 1, rand_bits++); /* how long are the rand()s ? */
|
|
|
+ i = callid_prefix.len * 4 / rand_bits; /* how many rands() fit in the ULONG ? */
|
|
|
+
|
|
|
+ /* now fill in the callid with as many random
|
|
|
+ * numbers as you can + 1 */
|
|
|
+ callid_nr = rand(); /* this is the + 1 */
|
|
|
+
|
|
|
+ while(i--) {
|
|
|
+ callid_nr <<= rand_bits;
|
|
|
+ callid_nr |= rand();
|
|
|
+ }
|
|
|
+
|
|
|
+ i = snprintf(callid_prefix.s, callid_prefix.len + 1, "%0*lx", callid_prefix.len, callid_nr);
|
|
|
+ if ((i == -1) || (i > callid_prefix.len)) {
|
|
|
+ LOG(L_CRIT, "BUG: SORRY, callid calculation failed\n");
|
|
|
+ return -2;
|
|
|
+ }
|
|
|
+
|
|
|
+ DBG("Call-ID initialization: '%.*s'\n", callid_prefix.len, callid_prefix.s);
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * \brief Child initialization, generates suffix
|
|
|
+ * \param rank not used
|
|
|
+ * \return 0 on success, -1 on error
|
|
|
+ */
|
|
|
+int kz_callid_child_init(int rank)
|
|
|
+{
|
|
|
+ struct socket_info *si;
|
|
|
+
|
|
|
+ /* on tcp/tls bind_address is 0 so try to get the first address we listen
|
|
|
+ * on no matter the protocol */
|
|
|
+ si=bind_address?bind_address:get_first_socket();
|
|
|
+ if (si==0){
|
|
|
+ LOG(L_CRIT, "BUG: child_init_callid: null socket list\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ callid_suffix.s = callid_buf + callid_prefix.len;
|
|
|
+
|
|
|
+ callid_suffix.len = snprintf(callid_suffix.s, CALLID_SUFFIX_LEN,
|
|
|
+ "%c%d@%.*s", '-', my_pid(),
|
|
|
+ si->address_str.len,
|
|
|
+ si->address_str.s);
|
|
|
+ if ((callid_suffix.len == -1) || (callid_suffix.len > CALLID_SUFFIX_LEN)) {
|
|
|
+ LOG(L_ERR, "ERROR: child_init_callid: buffer too small\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ DBG("DEBUG: callid: '%.*s'\n", callid_prefix.len + callid_suffix.len, callid_prefix.s);
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * \brief Increment a character in hex, return the carry flag
|
|
|
+ * \param _c input character
|
|
|
+ * \return carry flag
|
|
|
+ */
|
|
|
+static inline int inc_hexchar(char* _c)
|
|
|
+{
|
|
|
+ if (*_c == '9') {
|
|
|
+ *_c = 'a';
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (*_c == 'f') {
|
|
|
+ *_c = '0';
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ (*_c)++;
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+void kz_generate_callid(str* callid)
|
|
|
+{
|
|
|
+ int i;
|
|
|
+
|
|
|
+ for(i = callid_prefix.len; i; i--) {
|
|
|
+ if (!inc_hexchar(callid_prefix.s + i - 1)) break;
|
|
|
+ }
|
|
|
+ callid->s = callid_prefix.s;
|
|
|
+ callid->len = callid_prefix.len + callid_suffix.len;
|
|
|
+}
|