|
@@ -708,10 +708,11 @@ error:
|
|
|
|
|
|
void kz_amqp_connection_close(kz_amqp_conn_ptr rmq) {
|
|
|
LM_DBG("Close rmq connection\n");
|
|
|
- kz_amqp_fire_connection_event("closed", rmq->server->connection->info.host);
|
|
|
if (!rmq)
|
|
|
return;
|
|
|
|
|
|
+ kz_amqp_fire_connection_event("closed", rmq->server->connection->info.host);
|
|
|
+
|
|
|
if (rmq->conn) {
|
|
|
LM_DBG("close connection: %d rmq(%p)->conn(%p)\n", getpid(), (void *)rmq, rmq->conn);
|
|
|
kz_amqp_error("closing connection", amqp_connection_close(rmq->conn, AMQP_REPLY_SUCCESS));
|
|
@@ -722,6 +723,7 @@ void kz_amqp_connection_close(kz_amqp_conn_ptr rmq) {
|
|
|
rmq->socket = NULL;
|
|
|
rmq->channel_count = 0;
|
|
|
}
|
|
|
+ rmq->state = KZ_AMQP_CONNECTION_CLOSED;
|
|
|
|
|
|
}
|
|
|
|
|
@@ -751,7 +753,7 @@ int kz_amqp_connection_open_ssl(kz_amqp_conn_ptr rmq) {
|
|
|
rmq->socket = amqp_ssl_socket_new(rmq->conn);
|
|
|
if (!rmq->socket) {
|
|
|
LM_ERR("Failed to create SSL socket to AMQP broker\n");
|
|
|
- goto error;
|
|
|
+ goto nosocket;
|
|
|
}
|
|
|
|
|
|
if (kz_amqps_ca_cert.s) {
|
|
@@ -801,6 +803,8 @@ nosocket:
|
|
|
if (amqp_destroy_connection(rmq->conn) < 0) {
|
|
|
LM_ERR("cannot destroy connection\n");
|
|
|
}
|
|
|
+
|
|
|
+ rmq->conn = NULL;
|
|
|
return -1;
|
|
|
|
|
|
error:
|
|
@@ -824,12 +828,12 @@ int kz_amqp_connection_open(kz_amqp_conn_ptr rmq) {
|
|
|
rmq->socket = amqp_tcp_socket_new(rmq->conn);
|
|
|
if (!rmq->socket) {
|
|
|
LM_DBG("Failed to create TCP socket to AMQP broker\n");
|
|
|
- goto error;
|
|
|
+ goto nosocket;
|
|
|
}
|
|
|
|
|
|
if (amqp_socket_open(rmq->socket, rmq->server->connection->info.host, rmq->server->connection->info.port)) {
|
|
|
LM_DBG("Failed to open TCP socket to AMQP broker\n");
|
|
|
- goto error;
|
|
|
+ goto nosocket;
|
|
|
}
|
|
|
|
|
|
if (kz_amqp_error("Logging in", amqp_login(rmq->conn,
|
|
@@ -848,7 +852,14 @@ int kz_amqp_connection_open(kz_amqp_conn_ptr rmq) {
|
|
|
rmq->state = KZ_AMQP_CONNECTION_OPEN;
|
|
|
return 0;
|
|
|
|
|
|
- error:
|
|
|
+nosocket:
|
|
|
+ if (amqp_destroy_connection(rmq->conn) < 0) {
|
|
|
+ LM_ERR("cannot destroy connection\n");
|
|
|
+ }
|
|
|
+ rmq->conn = NULL;
|
|
|
+ return -1;
|
|
|
+
|
|
|
+error:
|
|
|
kz_amqp_connection_close(rmq);
|
|
|
return -1;
|
|
|
}
|
|
@@ -2239,11 +2250,11 @@ error:
|
|
|
|
|
|
void kz_amqp_reconnect_cb(int fd, short event, void *arg)
|
|
|
{
|
|
|
- LM_INFO("Attempting to reconnect now.");
|
|
|
+ LM_DBG("attempting to reconnect now.\n");
|
|
|
kz_amqp_conn_ptr connection = (kz_amqp_conn_ptr)arg;
|
|
|
|
|
|
if (connection->state == KZ_AMQP_CONNECTION_OPEN) {
|
|
|
- LM_WARN("Trying to connect an already connected server.");
|
|
|
+ LM_WARN("trying to connect an already connected server.\n");
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -2262,11 +2273,13 @@ void kz_amqp_reconnect_cb(int fd, short event, void *arg)
|
|
|
int kz_amqp_handle_server_failure(kz_amqp_conn_ptr connection)
|
|
|
{
|
|
|
|
|
|
- connection->state = KZ_AMQP_CONNECTION_FAILURE;
|
|
|
+ if(connection->state != KZ_AMQP_CONNECTION_CLOSED)
|
|
|
+ connection->state = KZ_AMQP_CONNECTION_FAILURE;
|
|
|
+
|
|
|
int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
|
|
|
|
|
|
if (timerfd == -1) {
|
|
|
- LM_ERR("Could not create timerfd to reschedule connection. No further attempts will be made to reconnect this server.");
|
|
|
+ LM_ERR("could not create timerfd to reschedule connection. No further attempts will be made to reconnect this server.\n");
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
@@ -2279,14 +2292,14 @@ int kz_amqp_handle_server_failure(kz_amqp_conn_ptr connection)
|
|
|
|
|
|
if (timerfd_settime(timerfd, 0, itime, NULL) == -1)
|
|
|
{
|
|
|
- LM_ERR("Could not set timer to reschedule connection. No further attempts will be made to reconnect this server.");
|
|
|
+ LM_ERR("could not set timer to reschedule connection. No further attempts will be made to reconnect this server.\n");
|
|
|
return -1;
|
|
|
}
|
|
|
- LM_INFO("timerfd value is %d\n", timerfd);
|
|
|
+ LM_DBG("timerfd value is %d\n", timerfd);
|
|
|
struct event *timer_ev = pkg_malloc(sizeof(struct event));
|
|
|
event_set(timer_ev, timerfd, EV_READ, kz_amqp_reconnect_cb, connection);
|
|
|
if(event_add(timer_ev, NULL) == -1) {
|
|
|
- LM_ERR("event_add failed while rescheduling connection (%s). No further attempts will be made to reconnect this server.", strerror(errno));
|
|
|
+ LM_ERR("event_add failed while rescheduling connection (%s). No further attempts will be made to reconnect this server.\n", strerror(errno));
|
|
|
return -1;
|
|
|
}
|
|
|
connection->ev = timer_ev;
|
|
@@ -2650,7 +2663,6 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
|
|
|
while(1) {
|
|
|
OK = 1;
|
|
|
if(kz_amqp_connection_open(consumer)) {
|
|
|
- LM_ERR("Error opening connection\n");
|
|
|
sleep(3);
|
|
|
continue;
|
|
|
}
|