|
@@ -47,6 +47,17 @@ const amqp_table_t kz_amqp_empty_table = { 0, NULL };
|
|
kz_amqp_zones_ptr kz_zones = NULL;
|
|
kz_amqp_zones_ptr kz_zones = NULL;
|
|
kz_amqp_zone_ptr kz_primary_zone = NULL;
|
|
kz_amqp_zone_ptr kz_primary_zone = NULL;
|
|
|
|
|
|
|
|
+
|
|
|
|
+amqp_exchange_declare_ok_t * AMQP_CALL kz_amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel,
|
|
|
|
+ amqp_bytes_t exchange, amqp_bytes_t type,
|
|
|
|
+ amqp_boolean_t passive, amqp_boolean_t durable, amqp_table_t arguments) {
|
|
|
|
+#if AMQP_VERSION_MINOR == 5
|
|
|
|
+ return amqp_exchange_declare(state, channel, exchange, type, passive, durable, arguments);
|
|
|
|
+#else
|
|
|
|
+ return amqp_exchange_declare(state, channel, exchange, type, passive, durable, 0, 0, arguments);
|
|
|
|
+#endif
|
|
|
|
+}
|
|
|
|
+
|
|
int set_non_blocking(int fd)
|
|
int set_non_blocking(int fd)
|
|
{
|
|
{
|
|
int flags;
|
|
int flags;
|
|
@@ -1609,7 +1620,7 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int idx )
|
|
kz_amqp_bind_ptr bind = kz_conn->server->channels[idx].targeted;
|
|
kz_amqp_bind_ptr bind = kz_conn->server->channels[idx].targeted;
|
|
int ret = -1;
|
|
int ret = -1;
|
|
|
|
|
|
- amqp_exchange_declare(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
|
|
|
|
|
|
+ kz_amqp_exchange_declare(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
|
|
if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
|
|
if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
|
|
{
|
|
{
|
|
ret = -RET_AMQP_ERROR;
|
|
ret = -RET_AMQP_ERROR;
|
|
@@ -1649,7 +1660,7 @@ int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind, int i
|
|
if(bind->federate == 0
|
|
if(bind->federate == 0
|
|
|| dbk_use_federated_exchange == 0
|
|
|| dbk_use_federated_exchange == 0
|
|
|| kz_conn->server->zone == kz_amqp_get_primary_zone()) {
|
|
|| kz_conn->server->zone == kz_amqp_get_primary_zone()) {
|
|
- amqp_exchange_declare(kz_conn->conn, chan[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
|
|
|
|
|
|
+ kz_amqp_exchange_declare(kz_conn->conn, chan[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
|
|
if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
|
|
if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
|
|
{
|
|
{
|
|
ret = -RET_AMQP_ERROR;
|
|
ret = -RET_AMQP_ERROR;
|
|
@@ -1661,7 +1672,7 @@ int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind, int i
|
|
&& dbk_use_federated_exchange == 1
|
|
&& dbk_use_federated_exchange == 1
|
|
&& kz_conn->server->zone != kz_amqp_get_primary_zone()) {
|
|
&& kz_conn->server->zone != kz_amqp_get_primary_zone()) {
|
|
federated_exchange = kz_local_amqp_bytes_dup_from_string(dbk_federated_exchange.s);
|
|
federated_exchange = kz_local_amqp_bytes_dup_from_string(dbk_federated_exchange.s);
|
|
- amqp_exchange_declare(kz_conn->conn, chan[idx].channel, federated_exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
|
|
|
|
|
|
+ kz_amqp_exchange_declare(kz_conn->conn, chan[idx].channel, federated_exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
|
|
if (kz_amqp_error("Declaring federated exchange", amqp_get_rpc_reply(kz_conn->conn)))
|
|
if (kz_amqp_error("Declaring federated exchange", amqp_get_rpc_reply(kz_conn->conn)))
|
|
{
|
|
{
|
|
ret = -RET_AMQP_ERROR;
|
|
ret = -RET_AMQP_ERROR;
|
|
@@ -2548,7 +2559,7 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
|
|
{
|
|
{
|
|
int i, idx;
|
|
int i, idx;
|
|
int OK;
|
|
int OK;
|
|
- char* payload;
|
|
|
|
|
|
+// char* payload;
|
|
int channel_res;
|
|
int channel_res;
|
|
kz_amqp_conn_ptr consumer = NULL;
|
|
kz_amqp_conn_ptr consumer = NULL;
|
|
kz_amqp_channel_ptr consumer_channels = NULL;
|
|
kz_amqp_channel_ptr consumer_channels = NULL;
|
|
@@ -2622,7 +2633,7 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
|
|
LM_DBG("CONSUMER INIT DONE\n");
|
|
LM_DBG("CONSUMER INIT DONE\n");
|
|
|
|
|
|
while(OK) {
|
|
while(OK) {
|
|
- payload = NULL;
|
|
|
|
|
|
+// payload = NULL;
|
|
amqp_envelope_t envelope;
|
|
amqp_envelope_t envelope;
|
|
amqp_maybe_release_buffers(consumer->conn);
|
|
amqp_maybe_release_buffers(consumer->conn);
|
|
amqp_rpc_reply_t reply = amqp_consume_message(consumer->conn, &envelope, NULL, 0);
|
|
amqp_rpc_reply_t reply = amqp_consume_message(consumer->conn, &envelope, NULL, 0);
|