|
@@ -61,8 +61,9 @@
|
|
|
|
|
|
MODULE_VERSION
|
|
MODULE_VERSION
|
|
|
|
|
|
-static int rabbitmq_publish(struct sip_msg*, char*, char*, char*, char*);
|
|
|
|
-static int rabbitmq_publish_consume(struct sip_msg*, char*, char*, char*, char*, char*);
|
|
|
|
|
|
+static int rabbitmq_publish(struct sip_msg *, char *, char *, char *, char *);
|
|
|
|
+static int rabbitmq_publish_consume(
|
|
|
|
+ struct sip_msg *, char *, char *, char *, char *, char *);
|
|
static int mod_init(void);
|
|
static int mod_init(void);
|
|
static int mod_child_init(int);
|
|
static int mod_child_init(int);
|
|
|
|
|
|
@@ -83,14 +84,14 @@ static int rabbitmq_disconnect(amqp_connection_state_t *conn);
|
|
static int rabbitmq_reconnect(amqp_connection_state_t *conn);
|
|
static int rabbitmq_reconnect(amqp_connection_state_t *conn);
|
|
|
|
|
|
/* module fixup functions */
|
|
/* module fixup functions */
|
|
-static int fixup_params(void** param, int param_no)
|
|
|
|
|
|
+static int fixup_params(void **param, int param_no)
|
|
{
|
|
{
|
|
- if (param_no == 5) {
|
|
|
|
- if (fixup_pvar_null(param, 1) != 0) {
|
|
|
|
|
|
+ if(param_no == 5) {
|
|
|
|
+ if(fixup_pvar_null(param, 1) != 0) {
|
|
LM_ERR("failed to fixup result pvar\n");
|
|
LM_ERR("failed to fixup result pvar\n");
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
- if (((pv_spec_t *)(*param))->setf == NULL) {
|
|
|
|
|
|
+ if(((pv_spec_t *)(*param))->setf == NULL) {
|
|
LM_ERR("result pvar is not writeble\n");
|
|
LM_ERR("result pvar is not writeble\n");
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
@@ -102,9 +103,9 @@ static int fixup_params(void** param, int param_no)
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
|
|
-static int fixup_free_params(void** param, int param_no)
|
|
|
|
|
|
+static int fixup_free_params(void **param, int param_no)
|
|
{
|
|
{
|
|
- if (param_no == 5) {
|
|
|
|
|
|
+ if(param_no == 5) {
|
|
return fixup_free_pvar_null(param, 1);
|
|
return fixup_free_pvar_null(param, 1);
|
|
} else {
|
|
} else {
|
|
return fixup_free_spve_null(param, 1);
|
|
return fixup_free_spve_null(param, 1);
|
|
@@ -115,37 +116,39 @@ static int fixup_free_params(void** param, int param_no)
|
|
|
|
|
|
/* module commands */
|
|
/* module commands */
|
|
static cmd_export_t cmds[] = {
|
|
static cmd_export_t cmds[] = {
|
|
- {"rabbitmq_publish", (cmd_function)rabbitmq_publish, 4, fixup_params, fixup_free_params, REQUEST_ROUTE},
|
|
|
|
- {"rabbitmq_publish_consume", (cmd_function)rabbitmq_publish_consume, 5, fixup_params, fixup_free_params, REQUEST_ROUTE},
|
|
|
|
- { 0, 0, 0, 0, 0, 0}
|
|
|
|
|
|
+ {"rabbitmq_publish", (cmd_function)rabbitmq_publish, 4, fixup_params,
|
|
|
|
+ fixup_free_params, REQUEST_ROUTE},
|
|
|
|
+ {"rabbitmq_publish_consume", (cmd_function)rabbitmq_publish_consume, 5,
|
|
|
|
+ fixup_params, fixup_free_params, REQUEST_ROUTE},
|
|
|
|
+ {0, 0, 0, 0, 0, 0}
|
|
};
|
|
};
|
|
|
|
|
|
/* module parameters */
|
|
/* module parameters */
|
|
-static param_export_t params[] = {
|
|
|
|
- {"url", PARAM_STRING, &amqp_url},
|
|
|
|
|
|
+static param_export_t params[] = {{"url", PARAM_STRING, &amqp_url},
|
|
{"timeout_sec", PARAM_INT, &timeout_sec},
|
|
{"timeout_sec", PARAM_INT, &timeout_sec},
|
|
{"timeout_usec", PARAM_INT, &timeout_usec},
|
|
{"timeout_usec", PARAM_INT, &timeout_usec},
|
|
{"direct_reply_to", PARAM_INT, &direct_reply_to},
|
|
{"direct_reply_to", PARAM_INT, &direct_reply_to},
|
|
- { 0, 0, 0}
|
|
|
|
|
|
+ {0, 0, 0}
|
|
};
|
|
};
|
|
|
|
|
|
/* module exports */
|
|
/* module exports */
|
|
struct module_exports exports = {
|
|
struct module_exports exports = {
|
|
"rabbitmq", DEFAULT_DLFLAGS, /* dlopen flags */
|
|
"rabbitmq", DEFAULT_DLFLAGS, /* dlopen flags */
|
|
- cmds, /* Exported functions */
|
|
|
|
- params, 0, /* exported statistics */
|
|
|
|
- 0, /* exported MI functions */
|
|
|
|
- 0, /* exported pseudo-variables */
|
|
|
|
- 0, /* extra processes */
|
|
|
|
- mod_init, /* module initialization function */
|
|
|
|
|
|
+ cmds, /* Exported functions */
|
|
|
|
+ params, 0, /* exported statistics */
|
|
|
|
+ 0, /* exported MI functions */
|
|
|
|
+ 0, /* exported pseudo-variables */
|
|
|
|
+ 0, /* extra processes */
|
|
|
|
+ mod_init, /* module initialization function */
|
|
0,
|
|
0,
|
|
- 0, mod_child_init /* per-child init function */
|
|
|
|
|
|
+ 0,
|
|
|
|
+ mod_child_init /* per-child init function */
|
|
};
|
|
};
|
|
|
|
|
|
/* module init */
|
|
/* module init */
|
|
static int mod_init(void)
|
|
static int mod_init(void)
|
|
{
|
|
{
|
|
- if (amqp_parse_url(amqp_url, &amqp_info) == AMQP_STATUS_BAD_URL) {
|
|
|
|
|
|
+ if(amqp_parse_url(amqp_url, &amqp_info) == AMQP_STATUS_BAD_URL) {
|
|
LM_ERR("FAIL parsing url: '%s'\n", amqp_url);
|
|
LM_ERR("FAIL parsing url: '%s'\n", amqp_url);
|
|
return -1;
|
|
return -1;
|
|
} else {
|
|
} else {
|
|
@@ -157,14 +160,15 @@ static int mod_init(void)
|
|
|
|
|
|
|
|
|
|
/* module child init */
|
|
/* module child init */
|
|
-static int mod_child_init(int rank) {
|
|
|
|
|
|
+static int mod_child_init(int rank)
|
|
|
|
+{
|
|
// main and tcp manager process
|
|
// main and tcp manager process
|
|
- if (rank == PROC_MAIN || rank == PROC_TCP_MAIN) {
|
|
|
|
|
|
+ if(rank == PROC_MAIN || rank == PROC_TCP_MAIN) {
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
|
|
// routing process
|
|
// routing process
|
|
- if (rabbitmq_connect(&conn) != RABBITMQ_OK) {
|
|
|
|
|
|
+ if(rabbitmq_connect(&conn) != RABBITMQ_OK) {
|
|
LM_ERR("FAIL rabbitmq_connect()");
|
|
LM_ERR("FAIL rabbitmq_connect()");
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
@@ -174,28 +178,30 @@ static int mod_child_init(int rank) {
|
|
}
|
|
}
|
|
|
|
|
|
/* module helper functions */
|
|
/* module helper functions */
|
|
-static int rabbitmq_publish(struct sip_msg* msg, char* in_exchange, char* in_routingkey, char* in_contenttype, char* in_messagebody) {
|
|
|
|
|
|
+static int rabbitmq_publish(struct sip_msg *msg, char *in_exchange,
|
|
|
|
+ char *in_routingkey, char *in_contenttype, char *in_messagebody)
|
|
|
|
+{
|
|
int reconnect_attempts = 0;
|
|
int reconnect_attempts = 0;
|
|
int log_ret;
|
|
int log_ret;
|
|
str exchange, routingkey, messagebody, contenttype;
|
|
str exchange, routingkey, messagebody, contenttype;
|
|
|
|
|
|
// sanity checks
|
|
// sanity checks
|
|
- if (get_str_fparam(&exchange, msg, (fparam_t*)in_exchange) < 0) {
|
|
|
|
|
|
+ if(get_str_fparam(&exchange, msg, (fparam_t *)in_exchange) < 0) {
|
|
LM_ERR("failed to get exchange\n");
|
|
LM_ERR("failed to get exchange\n");
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
|
|
- if (get_str_fparam(&routingkey, msg, (fparam_t*)in_routingkey) < 0) {
|
|
|
|
|
|
+ if(get_str_fparam(&routingkey, msg, (fparam_t *)in_routingkey) < 0) {
|
|
LM_ERR("failed to get kouting key\n");
|
|
LM_ERR("failed to get kouting key\n");
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
|
|
- if (get_str_fparam(&messagebody, msg, (fparam_t*)in_messagebody) < 0) {
|
|
|
|
|
|
+ if(get_str_fparam(&messagebody, msg, (fparam_t *)in_messagebody) < 0) {
|
|
LM_ERR("failed to get message body\n");
|
|
LM_ERR("failed to get message body\n");
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
|
|
- if (get_str_fparam(&contenttype, msg, (fparam_t*)in_contenttype) < 0) {
|
|
|
|
|
|
+ if(get_str_fparam(&contenttype, msg, (fparam_t *)in_contenttype) < 0) {
|
|
LM_ERR("failed to get content type\n");
|
|
LM_ERR("failed to get content type\n");
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
@@ -203,20 +209,21 @@ static int rabbitmq_publish(struct sip_msg* msg, char* in_exchange, char* in_rou
|
|
reconnect:
|
|
reconnect:
|
|
// open channel
|
|
// open channel
|
|
amqp_channel_open(conn, 1);
|
|
amqp_channel_open(conn, 1);
|
|
- log_ret = log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_channel_open()");
|
|
|
|
|
|
+ log_ret =
|
|
|
|
+ log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_channel_open()");
|
|
|
|
|
|
// open channel - failed
|
|
// open channel - failed
|
|
- if (log_ret != AMQP_RESPONSE_NORMAL) {
|
|
|
|
|
|
+ if(log_ret != AMQP_RESPONSE_NORMAL) {
|
|
// reconnect - debug
|
|
// reconnect - debug
|
|
LM_ERR("FAIL: rabbitmq_reconnect(), attempts=%d\n", reconnect_attempts);
|
|
LM_ERR("FAIL: rabbitmq_reconnect(), attempts=%d\n", reconnect_attempts);
|
|
|
|
|
|
// reconnect
|
|
// reconnect
|
|
- if (reconnect_attempts < max_reconnect_attempts) {
|
|
|
|
|
|
+ if(reconnect_attempts < max_reconnect_attempts) {
|
|
// reconnect - debug
|
|
// reconnect - debug
|
|
LM_ERR("RETRY: rabbitmq_reconnect()\n");
|
|
LM_ERR("RETRY: rabbitmq_reconnect()\n");
|
|
|
|
|
|
// reconnect - success
|
|
// reconnect - success
|
|
- if (rabbitmq_reconnect(&conn) == RABBITMQ_OK) {
|
|
|
|
|
|
+ if(rabbitmq_reconnect(&conn) == RABBITMQ_OK) {
|
|
// reconnect - debug
|
|
// reconnect - debug
|
|
LM_ERR("SUCCESS: rabbitmq_reconnect()\n");
|
|
LM_ERR("SUCCESS: rabbitmq_reconnect()\n");
|
|
}
|
|
}
|
|
@@ -235,30 +242,26 @@ reconnect:
|
|
|
|
|
|
// alloc properties
|
|
// alloc properties
|
|
amqp_basic_properties_t props;
|
|
amqp_basic_properties_t props;
|
|
- props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
|
|
|
|
- AMQP_BASIC_DELIVERY_MODE_FLAG |
|
|
|
|
- AMQP_BASIC_CORRELATION_ID_FLAG;
|
|
|
|
|
|
+ props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG
|
|
|
|
+ | AMQP_BASIC_CORRELATION_ID_FLAG;
|
|
props.content_type = amqp_cstring_bytes(contenttype.s);
|
|
props.content_type = amqp_cstring_bytes(contenttype.s);
|
|
props.delivery_mode = 2; /* persistent delivery mode */
|
|
props.delivery_mode = 2; /* persistent delivery mode */
|
|
props.correlation_id = amqp_cstring_bytes("1");
|
|
props.correlation_id = amqp_cstring_bytes("1");
|
|
|
|
|
|
// publish
|
|
// publish
|
|
- if (log_on_error(amqp_basic_publish(conn,1,
|
|
|
|
- amqp_cstring_bytes(exchange.s),
|
|
|
|
- amqp_cstring_bytes(routingkey.s),
|
|
|
|
- 0,
|
|
|
|
- 0,
|
|
|
|
- &props,
|
|
|
|
- amqp_cstring_bytes(messagebody.s)),
|
|
|
|
- "amqp_basic_publish()") != AMQP_RESPONSE_NORMAL) {
|
|
|
|
- // debug
|
|
|
|
- LM_ERR("FAIL: amqp_basic_publish()\n");
|
|
|
|
-
|
|
|
|
- // cleanup
|
|
|
|
- amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
|
|
|
|
|
|
+ if(log_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange.s),
|
|
|
|
+ amqp_cstring_bytes(routingkey.s), 0, 0, &props,
|
|
|
|
+ amqp_cstring_bytes(messagebody.s)),
|
|
|
|
+ "amqp_basic_publish()")
|
|
|
|
+ != AMQP_RESPONSE_NORMAL) {
|
|
|
|
+ // debug
|
|
|
|
+ LM_ERR("FAIL: amqp_basic_publish()\n");
|
|
|
|
+
|
|
|
|
+ // cleanup
|
|
|
|
+ amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
|
|
|
|
|
|
- // error
|
|
|
|
- return RABBITMQ_ERR_PUBLISH;
|
|
|
|
|
|
+ // error
|
|
|
|
+ return RABBITMQ_ERR_PUBLISH;
|
|
}
|
|
}
|
|
|
|
|
|
// debug
|
|
// debug
|
|
@@ -271,7 +274,10 @@ reconnect:
|
|
return RABBITMQ_OK;
|
|
return RABBITMQ_OK;
|
|
}
|
|
}
|
|
|
|
|
|
-static int rabbitmq_publish_consume(struct sip_msg* msg, char* in_exchange, char* in_routingkey, char* in_contenttype, char* in_messagebody, char* reply) {
|
|
|
|
|
|
+static int rabbitmq_publish_consume(struct sip_msg *msg, char *in_exchange,
|
|
|
|
+ char *in_routingkey, char *in_contenttype, char *in_messagebody,
|
|
|
|
+ char *reply)
|
|
|
|
+{
|
|
pv_spec_t *dst;
|
|
pv_spec_t *dst;
|
|
pv_value_t val;
|
|
pv_value_t val;
|
|
str exchange, routingkey, messagebody, contenttype;
|
|
str exchange, routingkey, messagebody, contenttype;
|
|
@@ -285,8 +291,8 @@ static int rabbitmq_publish_consume(struct sip_msg* msg, char* in_exchange, char
|
|
size_t body_received;
|
|
size_t body_received;
|
|
|
|
|
|
struct timeval tv;
|
|
struct timeval tv;
|
|
- tv.tv_sec=timeout_sec;
|
|
|
|
- tv.tv_usec=timeout_usec;
|
|
|
|
|
|
+ tv.tv_sec = timeout_sec;
|
|
|
|
+ tv.tv_usec = timeout_usec;
|
|
|
|
|
|
amqp_queue_declare_ok_t *reply_to;
|
|
amqp_queue_declare_ok_t *reply_to;
|
|
|
|
|
|
@@ -295,22 +301,22 @@ static int rabbitmq_publish_consume(struct sip_msg* msg, char* in_exchange, char
|
|
char reply_to_buffer[64];
|
|
char reply_to_buffer[64];
|
|
|
|
|
|
// sanity checks
|
|
// sanity checks
|
|
- if (get_str_fparam(&exchange, msg, (fparam_t*)in_exchange) < 0) {
|
|
|
|
|
|
+ if(get_str_fparam(&exchange, msg, (fparam_t *)in_exchange) < 0) {
|
|
LM_ERR("failed to get exchange\n");
|
|
LM_ERR("failed to get exchange\n");
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
|
|
- if (get_str_fparam(&routingkey, msg, (fparam_t*)in_routingkey) < 0) {
|
|
|
|
|
|
+ if(get_str_fparam(&routingkey, msg, (fparam_t *)in_routingkey) < 0) {
|
|
LM_ERR("failed to get kouting key\n");
|
|
LM_ERR("failed to get kouting key\n");
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
|
|
- if (get_str_fparam(&messagebody, msg, (fparam_t*)in_messagebody) < 0) {
|
|
|
|
|
|
+ if(get_str_fparam(&messagebody, msg, (fparam_t *)in_messagebody) < 0) {
|
|
LM_ERR("failed to get message body\n");
|
|
LM_ERR("failed to get message body\n");
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
|
|
- if (get_str_fparam(&contenttype, msg, (fparam_t*)in_contenttype) < 0) {
|
|
|
|
|
|
+ if(get_str_fparam(&contenttype, msg, (fparam_t *)in_contenttype) < 0) {
|
|
LM_ERR("failed to get content type\n");
|
|
LM_ERR("failed to get content type\n");
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
@@ -318,20 +324,21 @@ static int rabbitmq_publish_consume(struct sip_msg* msg, char* in_exchange, char
|
|
reconnect:
|
|
reconnect:
|
|
// open channel
|
|
// open channel
|
|
amqp_channel_open(conn, 1);
|
|
amqp_channel_open(conn, 1);
|
|
- log_ret = log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_channel_open()");
|
|
|
|
|
|
+ log_ret =
|
|
|
|
+ log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_channel_open()");
|
|
|
|
|
|
// open channel - failed
|
|
// open channel - failed
|
|
- if (log_ret != AMQP_RESPONSE_NORMAL) {
|
|
|
|
|
|
+ if(log_ret != AMQP_RESPONSE_NORMAL) {
|
|
// reconnect - debug
|
|
// reconnect - debug
|
|
LM_ERR("FAIL: rabbitmq_reconnect(), attempts=%d\n", reconnect_attempts);
|
|
LM_ERR("FAIL: rabbitmq_reconnect(), attempts=%d\n", reconnect_attempts);
|
|
|
|
|
|
// reconnect
|
|
// reconnect
|
|
- if (reconnect_attempts < max_reconnect_attempts) {
|
|
|
|
|
|
+ if(reconnect_attempts < max_reconnect_attempts) {
|
|
// reconnect - debug
|
|
// reconnect - debug
|
|
LM_ERR("RETRY: rabbitmq_reconnect()\n");
|
|
LM_ERR("RETRY: rabbitmq_reconnect()\n");
|
|
|
|
|
|
// reconnect - success
|
|
// reconnect - success
|
|
- if (rabbitmq_reconnect(&conn) == RABBITMQ_OK) {
|
|
|
|
|
|
+ if(rabbitmq_reconnect(&conn) == RABBITMQ_OK) {
|
|
// reconnect - debug
|
|
// reconnect - debug
|
|
LM_ERR("SUCCESS: rabbitmq_reconnect()\n");
|
|
LM_ERR("SUCCESS: rabbitmq_reconnect()\n");
|
|
}
|
|
}
|
|
@@ -349,8 +356,10 @@ reconnect:
|
|
}
|
|
}
|
|
|
|
|
|
// alloc reply_to queue
|
|
// alloc reply_to queue
|
|
- if (direct_reply_to == 1) {
|
|
|
|
- reply_to = amqp_queue_declare(conn, 1, amqp_cstring_bytes("amq.rabbitmq.reply-to"), 0, 0, 1, 1, amqp_empty_table);
|
|
|
|
|
|
+ if(direct_reply_to == 1) {
|
|
|
|
+ reply_to = amqp_queue_declare(conn, 1,
|
|
|
|
+ amqp_cstring_bytes("amq.rabbitmq.reply-to"), 0, 0, 1, 1,
|
|
|
|
+ amqp_empty_table);
|
|
} else {
|
|
} else {
|
|
uuid_generate_random(uuid);
|
|
uuid_generate_random(uuid);
|
|
uuid_unparse(uuid, uuid_buffer);
|
|
uuid_unparse(uuid, uuid_buffer);
|
|
@@ -358,27 +367,29 @@ reconnect:
|
|
strcpy(reply_to_buffer, "kamailio-");
|
|
strcpy(reply_to_buffer, "kamailio-");
|
|
strcat(reply_to_buffer, uuid_buffer);
|
|
strcat(reply_to_buffer, uuid_buffer);
|
|
|
|
|
|
- reply_to = amqp_queue_declare(conn, 1, amqp_cstring_bytes(reply_to_buffer), 0, 0, 1, 1, amqp_empty_table);
|
|
|
|
|
|
+ reply_to =
|
|
|
|
+ amqp_queue_declare(conn, 1, amqp_cstring_bytes(reply_to_buffer),
|
|
|
|
+ 0, 0, 1, 1, amqp_empty_table);
|
|
}
|
|
}
|
|
|
|
|
|
- if (log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_queue_declare()") != AMQP_RESPONSE_NORMAL) {
|
|
|
|
|
|
+ if(log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_queue_declare()")
|
|
|
|
+ != AMQP_RESPONSE_NORMAL) {
|
|
LM_ERR("FAIL: amqp_queue_declare()\n");
|
|
LM_ERR("FAIL: amqp_queue_declare()\n");
|
|
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
|
|
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
|
|
return RABBITMQ_ERR_QUEUE;
|
|
return RABBITMQ_ERR_QUEUE;
|
|
}
|
|
}
|
|
|
|
|
|
- LM_INFO("reply_to = %.*s\n", (int)reply_to->queue.len, (char*)reply_to->queue.bytes);
|
|
|
|
|
|
+ LM_INFO("reply_to = %.*s\n", (int)reply_to->queue.len,
|
|
|
|
+ (char *)reply_to->queue.bytes);
|
|
|
|
|
|
// alloc request properties
|
|
// alloc request properties
|
|
amqp_basic_properties_t props;
|
|
amqp_basic_properties_t props;
|
|
- props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
|
|
|
|
- AMQP_BASIC_DELIVERY_MODE_FLAG |
|
|
|
|
- AMQP_BASIC_REPLY_TO_FLAG |
|
|
|
|
- AMQP_BASIC_CORRELATION_ID_FLAG;
|
|
|
|
|
|
+ props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG
|
|
|
|
+ | AMQP_BASIC_REPLY_TO_FLAG | AMQP_BASIC_CORRELATION_ID_FLAG;
|
|
props.content_type = amqp_cstring_bytes(contenttype.s);
|
|
props.content_type = amqp_cstring_bytes(contenttype.s);
|
|
props.delivery_mode = 2; /* persistent delivery mode */
|
|
props.delivery_mode = 2; /* persistent delivery mode */
|
|
props.reply_to = reply_to->queue;
|
|
props.reply_to = reply_to->queue;
|
|
- if (props.reply_to.bytes == NULL) {
|
|
|
|
|
|
+ if(props.reply_to.bytes == NULL) {
|
|
LM_ERR("Out of memory while copying queue name");
|
|
LM_ERR("Out of memory while copying queue name");
|
|
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
|
|
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
|
|
return -1;
|
|
return -1;
|
|
@@ -386,32 +397,31 @@ reconnect:
|
|
props.correlation_id = amqp_cstring_bytes("1");
|
|
props.correlation_id = amqp_cstring_bytes("1");
|
|
|
|
|
|
// start consume
|
|
// start consume
|
|
- amqp_basic_consume(conn, 1, reply_to->queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
|
|
|
|
- if (log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_basic_consume()") != AMQP_RESPONSE_NORMAL) {
|
|
|
|
|
|
+ amqp_basic_consume(conn, 1, reply_to->queue, amqp_empty_bytes, 0, 1, 0,
|
|
|
|
+ amqp_empty_table);
|
|
|
|
+ if(log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_basic_consume()")
|
|
|
|
+ != AMQP_RESPONSE_NORMAL) {
|
|
LM_ERR("FAIL: amqp_basic_consume()\n");
|
|
LM_ERR("FAIL: amqp_basic_consume()\n");
|
|
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
|
|
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
|
|
return RABBITMQ_ERR_CONSUME;
|
|
return RABBITMQ_ERR_CONSUME;
|
|
}
|
|
}
|
|
|
|
|
|
// publish
|
|
// publish
|
|
- if (log_on_error(amqp_basic_publish(conn,1,
|
|
|
|
- amqp_cstring_bytes(exchange.s),
|
|
|
|
- amqp_cstring_bytes(routingkey.s),
|
|
|
|
- 0,
|
|
|
|
- 0,
|
|
|
|
- &props,
|
|
|
|
- amqp_cstring_bytes(messagebody.s)),
|
|
|
|
- "amqp_basic_publish()") != AMQP_RESPONSE_NORMAL) {
|
|
|
|
- LM_ERR("FAIL: amqp_basic_publish()\n");
|
|
|
|
- amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
|
|
|
|
- return RABBITMQ_ERR_PUBLISH;
|
|
|
|
|
|
+ if(log_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange.s),
|
|
|
|
+ amqp_cstring_bytes(routingkey.s), 0, 0, &props,
|
|
|
|
+ amqp_cstring_bytes(messagebody.s)),
|
|
|
|
+ "amqp_basic_publish()")
|
|
|
|
+ != AMQP_RESPONSE_NORMAL) {
|
|
|
|
+ LM_ERR("FAIL: amqp_basic_publish()\n");
|
|
|
|
+ amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
|
|
|
|
+ return RABBITMQ_ERR_PUBLISH;
|
|
}
|
|
}
|
|
|
|
|
|
// consume frame
|
|
// consume frame
|
|
- for (;;) {
|
|
|
|
|
|
+ for(;;) {
|
|
amqp_maybe_release_buffers(conn);
|
|
amqp_maybe_release_buffers(conn);
|
|
result = amqp_simple_wait_frame_noblock(conn, &frame, &tv);
|
|
result = amqp_simple_wait_frame_noblock(conn, &frame, &tv);
|
|
- if (result < 0) {
|
|
|
|
|
|
+ if(result < 0) {
|
|
LM_ERR("amqp_simple_wait_frame_noblock() error: %d\n", result);
|
|
LM_ERR("amqp_simple_wait_frame_noblock() error: %d\n", result);
|
|
result = -1;
|
|
result = -1;
|
|
break;
|
|
break;
|
|
@@ -420,23 +430,23 @@ reconnect:
|
|
}
|
|
}
|
|
|
|
|
|
LM_DBG("Frame type: %u channel: %u\n", frame.frame_type, frame.channel);
|
|
LM_DBG("Frame type: %u channel: %u\n", frame.frame_type, frame.channel);
|
|
- if (frame.frame_type != AMQP_FRAME_METHOD) {
|
|
|
|
|
|
+ if(frame.frame_type != AMQP_FRAME_METHOD) {
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
|
|
LM_DBG("Method: %s\n", amqp_method_name(frame.payload.method.id));
|
|
LM_DBG("Method: %s\n", amqp_method_name(frame.payload.method.id));
|
|
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
|
|
|
|
|
|
+ if(frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
|
|
- d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
|
|
|
|
|
|
+ d = (amqp_basic_deliver_t *)frame.payload.method.decoded;
|
|
LM_DBG("Delivery: %u exchange: %.*s routingkey: %.*s\n",
|
|
LM_DBG("Delivery: %u exchange: %.*s routingkey: %.*s\n",
|
|
- (unsigned) d->delivery_tag,
|
|
|
|
- (int) d->exchange.len, (char *) d->exchange.bytes,
|
|
|
|
- (int) d->routing_key.len, (char *) d->routing_key.bytes);
|
|
|
|
|
|
+ (unsigned)d->delivery_tag, (int)d->exchange.len,
|
|
|
|
+ (char *)d->exchange.bytes, (int)d->routing_key.len,
|
|
|
|
+ (char *)d->routing_key.bytes);
|
|
|
|
|
|
result = amqp_simple_wait_frame_noblock(conn, &frame, &tv);
|
|
result = amqp_simple_wait_frame_noblock(conn, &frame, &tv);
|
|
- if (result < 0) {
|
|
|
|
|
|
+ if(result < 0) {
|
|
LM_ERR("amqp_simple_wait_frame_noblock() error: %d\n", result);
|
|
LM_ERR("amqp_simple_wait_frame_noblock() error: %d\n", result);
|
|
result = -1;
|
|
result = -1;
|
|
break;
|
|
break;
|
|
@@ -444,24 +454,24 @@ reconnect:
|
|
result = RABBITMQ_OK;
|
|
result = RABBITMQ_OK;
|
|
}
|
|
}
|
|
|
|
|
|
- if (frame.frame_type != AMQP_FRAME_HEADER) {
|
|
|
|
|
|
+ if(frame.frame_type != AMQP_FRAME_HEADER) {
|
|
LM_ERR("Expected header!");
|
|
LM_ERR("Expected header!");
|
|
result = -1;
|
|
result = -1;
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
|
|
|
|
- p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
|
|
|
|
- if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
|
|
|
|
- LM_DBG("Content-type: %.*s\n",
|
|
|
|
- (int) p->content_type.len, (char *) p->content_type.bytes);
|
|
|
|
|
|
+ p = (amqp_basic_properties_t *)frame.payload.properties.decoded;
|
|
|
|
+ if(p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
|
|
|
|
+ LM_DBG("Content-type: %.*s\n", (int)p->content_type.len,
|
|
|
|
+ (char *)p->content_type.bytes);
|
|
}
|
|
}
|
|
|
|
|
|
body_target = (size_t)frame.payload.properties.body_size;
|
|
body_target = (size_t)frame.payload.properties.body_size;
|
|
body_received = 0;
|
|
body_received = 0;
|
|
|
|
|
|
- while (body_received < body_target) {
|
|
|
|
|
|
+ while(body_received < body_target) {
|
|
result = amqp_simple_wait_frame_noblock(conn, &frame, &tv);
|
|
result = amqp_simple_wait_frame_noblock(conn, &frame, &tv);
|
|
- if (result < 0) {
|
|
|
|
|
|
+ if(result < 0) {
|
|
LM_ERR("amqp_simple_wait_frame_noblock() error: %d\n", result);
|
|
LM_ERR("amqp_simple_wait_frame_noblock() error: %d\n", result);
|
|
result = -1;
|
|
result = -1;
|
|
break;
|
|
break;
|
|
@@ -469,7 +479,7 @@ reconnect:
|
|
result = RABBITMQ_OK;
|
|
result = RABBITMQ_OK;
|
|
}
|
|
}
|
|
|
|
|
|
- if (frame.frame_type != AMQP_FRAME_BODY) {
|
|
|
|
|
|
+ if(frame.frame_type != AMQP_FRAME_BODY) {
|
|
LM_ERR("Expected body!");
|
|
LM_ERR("Expected body!");
|
|
result = -1;
|
|
result = -1;
|
|
break;
|
|
break;
|
|
@@ -478,7 +488,7 @@ reconnect:
|
|
body_received += frame.payload.body_fragment.len;
|
|
body_received += frame.payload.body_fragment.len;
|
|
assert(body_received <= body_target);
|
|
assert(body_received <= body_target);
|
|
|
|
|
|
- val.rs.s = (char*)frame.payload.body_fragment.bytes;
|
|
|
|
|
|
+ val.rs.s = (char *)frame.payload.body_fragment.bytes;
|
|
val.rs.len = (int)frame.payload.body_fragment.len;
|
|
val.rs.len = (int)frame.payload.body_fragment.len;
|
|
|
|
|
|
LM_DBG("RPC Call result: %.*s\n", val.rs.len, val.rs.s);
|
|
LM_DBG("RPC Call result: %.*s\n", val.rs.len, val.rs.s);
|
|
@@ -488,7 +498,7 @@ reconnect:
|
|
}
|
|
}
|
|
|
|
|
|
// amqp_simple_wait_frame <= 0
|
|
// amqp_simple_wait_frame <= 0
|
|
- if (body_received != body_target) {
|
|
|
|
|
|
+ if(body_received != body_target) {
|
|
LM_ERR("body_received != body_target'\n");
|
|
LM_ERR("body_received != body_target'\n");
|
|
result = -1;
|
|
result = -1;
|
|
break;
|
|
break;
|
|
@@ -503,34 +513,39 @@ reconnect:
|
|
return result;
|
|
return result;
|
|
}
|
|
}
|
|
|
|
|
|
-static int rabbitmq_connect(amqp_connection_state_t *conn) {
|
|
|
|
|
|
+static int rabbitmq_connect(amqp_connection_state_t *conn)
|
|
|
|
+{
|
|
int ret;
|
|
int ret;
|
|
int log_ret;
|
|
int log_ret;
|
|
-// amqp_rpc_reply_t reply;
|
|
|
|
|
|
+ // amqp_rpc_reply_t reply;
|
|
|
|
|
|
// establish a new connection to RabbitMQ server
|
|
// establish a new connection to RabbitMQ server
|
|
*conn = amqp_new_connection();
|
|
*conn = amqp_new_connection();
|
|
- log_ret = log_on_amqp_error(amqp_get_rpc_reply(*conn), "amqp_new_connection()");
|
|
|
|
- if (log_ret != AMQP_RESPONSE_NORMAL && log_ret != AMQP_RESPONSE_NONE) {
|
|
|
|
|
|
+ log_ret = log_on_amqp_error(
|
|
|
|
+ amqp_get_rpc_reply(*conn), "amqp_new_connection()");
|
|
|
|
+ if(log_ret != AMQP_RESPONSE_NORMAL && log_ret != AMQP_RESPONSE_NONE) {
|
|
return RABBITMQ_ERR_CONNECT;
|
|
return RABBITMQ_ERR_CONNECT;
|
|
}
|
|
}
|
|
|
|
|
|
amqp_sock = amqp_tcp_socket_new(*conn);
|
|
amqp_sock = amqp_tcp_socket_new(*conn);
|
|
- if (!amqp_sock) {
|
|
|
|
|
|
+ if(!amqp_sock) {
|
|
LM_ERR("FAIL: create TCP amqp_sock");
|
|
LM_ERR("FAIL: create TCP amqp_sock");
|
|
amqp_destroy_connection(*conn);
|
|
amqp_destroy_connection(*conn);
|
|
return RABBITMQ_ERR_SOCK;
|
|
return RABBITMQ_ERR_SOCK;
|
|
}
|
|
}
|
|
|
|
|
|
ret = amqp_socket_open(amqp_sock, amqp_info.host, amqp_info.port);
|
|
ret = amqp_socket_open(amqp_sock, amqp_info.host, amqp_info.port);
|
|
- if (ret != AMQP_STATUS_OK) {
|
|
|
|
|
|
+ if(ret != AMQP_STATUS_OK) {
|
|
LM_ERR("FAIL: open TCP sock, amqp_status=%d", ret);
|
|
LM_ERR("FAIL: open TCP sock, amqp_status=%d", ret);
|
|
// amqp_destroy_connection(*conn);
|
|
// amqp_destroy_connection(*conn);
|
|
return RABBITMQ_ERR_SOCK;
|
|
return RABBITMQ_ERR_SOCK;
|
|
}
|
|
}
|
|
|
|
|
|
- log_ret = log_on_amqp_error(amqp_login(*conn, amqp_info.vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, amqp_info.user, amqp_info.password), "amqp_login()");
|
|
|
|
- if (log_ret != AMQP_RESPONSE_NORMAL && log_ret != AMQP_RESPONSE_NONE) {
|
|
|
|
|
|
+ log_ret = log_on_amqp_error(
|
|
|
|
+ amqp_login(*conn, amqp_info.vhost, 0, 131072, 0,
|
|
|
|
+ AMQP_SASL_METHOD_PLAIN, amqp_info.user, amqp_info.password),
|
|
|
|
+ "amqp_login()");
|
|
|
|
+ if(log_ret != AMQP_RESPONSE_NORMAL && log_ret != AMQP_RESPONSE_NONE) {
|
|
LM_ERR("FAIL: amqp_login()\n");
|
|
LM_ERR("FAIL: amqp_login()\n");
|
|
// amqp_destroy_connection(*conn);
|
|
// amqp_destroy_connection(*conn);
|
|
return RABBITMQ_ERR_CONSUME;
|
|
return RABBITMQ_ERR_CONSUME;
|
|
@@ -539,15 +554,16 @@ static int rabbitmq_connect(amqp_connection_state_t *conn) {
|
|
return RABBITMQ_OK;
|
|
return RABBITMQ_OK;
|
|
}
|
|
}
|
|
|
|
|
|
-static int rabbitmq_disconnect(amqp_connection_state_t *conn) {
|
|
|
|
|
|
+static int rabbitmq_disconnect(amqp_connection_state_t *conn)
|
|
|
|
+{
|
|
int log_ret;
|
|
int log_ret;
|
|
|
|
|
|
// sanity checks
|
|
// sanity checks
|
|
- if (!*conn) {
|
|
|
|
|
|
+ if(!*conn) {
|
|
return RABBITMQ_ERR_NULL;
|
|
return RABBITMQ_ERR_NULL;
|
|
}
|
|
}
|
|
|
|
|
|
-/*
|
|
|
|
|
|
+ /*
|
|
log_ret = log_on_amqp_error(amqp_connection_close(*conn, AMQP_REPLY_SUCCESS), "amqp_connection_close()");
|
|
log_ret = log_on_amqp_error(amqp_connection_close(*conn, AMQP_REPLY_SUCCESS), "amqp_connection_close()");
|
|
if (log_ret != AMQP_RESPONSE_NORMAL && log_ret != AMQP_RESPONSE_NONE) {
|
|
if (log_ret != AMQP_RESPONSE_NORMAL && log_ret != AMQP_RESPONSE_NONE) {
|
|
LM_ERR("FAIL: amqp_connection_close()\n");
|
|
LM_ERR("FAIL: amqp_connection_close()\n");
|
|
@@ -555,33 +571,35 @@ static int rabbitmq_disconnect(amqp_connection_state_t *conn) {
|
|
}
|
|
}
|
|
*/
|
|
*/
|
|
|
|
|
|
- log_ret = log_on_error(amqp_destroy_connection(*conn), "amqp_destroy_connection()");
|
|
|
|
- if (log_ret != AMQP_RESPONSE_NORMAL && log_ret != AMQP_RESPONSE_NONE) {
|
|
|
|
|
|
+ log_ret = log_on_error(
|
|
|
|
+ amqp_destroy_connection(*conn), "amqp_destroy_connection()");
|
|
|
|
+ if(log_ret != AMQP_RESPONSE_NORMAL && log_ret != AMQP_RESPONSE_NONE) {
|
|
LM_ERR("FAIL: amqp_destroy_connection()\n");
|
|
LM_ERR("FAIL: amqp_destroy_connection()\n");
|
|
return RABBITMQ_ERR_CONNECT;
|
|
return RABBITMQ_ERR_CONNECT;
|
|
}
|
|
}
|
|
|
|
|
|
return RABBITMQ_OK;
|
|
return RABBITMQ_OK;
|
|
}
|
|
}
|
|
-static int rabbitmq_reconnect(amqp_connection_state_t *conn) {
|
|
|
|
|
|
+static int rabbitmq_reconnect(amqp_connection_state_t *conn)
|
|
|
|
+{
|
|
int ret;
|
|
int ret;
|
|
|
|
|
|
// sanity checks
|
|
// sanity checks
|
|
- if (!*conn) {
|
|
|
|
|
|
+ if(!*conn) {
|
|
return RABBITMQ_ERR_NULL;
|
|
return RABBITMQ_ERR_NULL;
|
|
}
|
|
}
|
|
|
|
|
|
// disconnect from old RabbitMQ server
|
|
// disconnect from old RabbitMQ server
|
|
- if ((ret = rabbitmq_disconnect(conn)) != RABBITMQ_OK) {
|
|
|
|
|
|
+ if((ret = rabbitmq_disconnect(conn)) != RABBITMQ_OK) {
|
|
LM_NOTICE("FAIL rabbitmq_disconnect() in rabbitmq_reconnect()\n");
|
|
LM_NOTICE("FAIL rabbitmq_disconnect() in rabbitmq_reconnect()\n");
|
|
return ret;
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
|
|
// connect to new RabbitMQ server
|
|
// connect to new RabbitMQ server
|
|
- if ((ret = rabbitmq_connect(conn)) != RABBITMQ_OK) {
|
|
|
|
|
|
+ if((ret = rabbitmq_connect(conn)) != RABBITMQ_OK) {
|
|
LM_NOTICE("FAIL rabbitmq_connect() in rabbitmq_reconnect()\n");
|
|
LM_NOTICE("FAIL rabbitmq_connect() in rabbitmq_reconnect()\n");
|
|
return ret;
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
|
|
return RABBITMQ_OK;
|
|
return RABBITMQ_OK;
|
|
-}
|
|
|
|
|
|
+}
|