|
@@ -1261,10 +1261,9 @@ int get_channel_index() {
|
|
int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int idx )
|
|
int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int idx )
|
|
{
|
|
{
|
|
kz_amqp_bind_ptr bind = channels[idx].targeted;
|
|
kz_amqp_bind_ptr bind = channels[idx].targeted;
|
|
- amqp_queue_declare_ok_t *r = NULL;
|
|
|
|
int ret = -1;
|
|
int ret = -1;
|
|
|
|
|
|
- r = amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, 0, 0, 1, 1, kz_amqp_empty_table);
|
|
|
|
|
|
+ amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, 0, 0, 1, 1, kz_amqp_empty_table);
|
|
if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
|
|
if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
|
|
{
|
|
{
|
|
goto error;
|
|
goto error;
|
|
@@ -1297,7 +1296,6 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int idx )
|
|
int kz_amqp_bind_targeted_channel_ex(kz_amqp_conn_ptr kz_conn, int loopcount, int idx )
|
|
int kz_amqp_bind_targeted_channel_ex(kz_amqp_conn_ptr kz_conn, int loopcount, int idx )
|
|
{
|
|
{
|
|
kz_amqp_bind_ptr bind = NULL;
|
|
kz_amqp_bind_ptr bind = NULL;
|
|
- amqp_queue_declare_ok_t *r = NULL;
|
|
|
|
str rpl_exch = str_init("targeted");
|
|
str rpl_exch = str_init("targeted");
|
|
str rpl_exch_type = str_init("direct");
|
|
str rpl_exch_type = str_init("direct");
|
|
int ret = -1;
|
|
int ret = -1;
|
|
@@ -1325,7 +1323,7 @@ int kz_amqp_bind_targeted_channel_ex(kz_amqp_conn_ptr kz_conn, int loopcount, in
|
|
goto error;
|
|
goto error;
|
|
}
|
|
}
|
|
|
|
|
|
- r = amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, 0, 0, 1, 1, kz_amqp_empty_table);
|
|
|
|
|
|
+ amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, 0, 0, 1, 1, kz_amqp_empty_table);
|
|
if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
|
|
if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
|
|
{
|
|
{
|
|
goto error;
|
|
goto error;
|
|
@@ -1738,12 +1736,12 @@ void kz_amqp_manager_loop(int child_no)
|
|
int INTERNAL_READ_COUNT , INTERNAL_READ_MAX_LOOP;
|
|
int INTERNAL_READ_COUNT , INTERNAL_READ_MAX_LOOP;
|
|
int CONSUMER_READ_COUNT , CONSUMER_READ_MAX_LOOP;
|
|
int CONSUMER_READ_COUNT , CONSUMER_READ_MAX_LOOP;
|
|
int ACK_READ_COUNT , ACK_READ_MAX_LOOP;
|
|
int ACK_READ_COUNT , ACK_READ_MAX_LOOP;
|
|
- char* payload;
|
|
|
|
int channel_res;
|
|
int channel_res;
|
|
kz_amqp_conn_ptr kzconn;
|
|
kz_amqp_conn_ptr kzconn;
|
|
kz_amqp_cmd_ptr cmd;
|
|
kz_amqp_cmd_ptr cmd;
|
|
int loopcount = 0;
|
|
int loopcount = 0;
|
|
int firstLoop = dbk_consume_messages_on_reconnect;
|
|
int firstLoop = dbk_consume_messages_on_reconnect;
|
|
|
|
+ char* payload = NULL;
|
|
|
|
|
|
|
|
|
|
while(1) {
|
|
while(1) {
|
|
@@ -2114,10 +2112,10 @@ void kz_amqp_consumer_proc(int child_no)
|
|
close(kz_pipe_fds[child_no*2+1]);
|
|
close(kz_pipe_fds[child_no*2+1]);
|
|
int i, idx;
|
|
int i, idx;
|
|
int OK;
|
|
int OK;
|
|
- char* payload;
|
|
|
|
int channel_res;
|
|
int channel_res;
|
|
kz_amqp_conn_ptr kzconn;
|
|
kz_amqp_conn_ptr kzconn;
|
|
kz_amqp_channel_ptr consumer_channels = NULL;
|
|
kz_amqp_channel_ptr consumer_channels = NULL;
|
|
|
|
+ char* payload = NULL;
|
|
|
|
|
|
kzconn = (kz_amqp_conn_ptr)pkg_malloc(sizeof(kz_amqp_conn));
|
|
kzconn = (kz_amqp_conn_ptr)pkg_malloc(sizeof(kz_amqp_conn));
|
|
if(kzconn == NULL)
|
|
if(kzconn == NULL)
|