Luis Azedo 8 лет назад
Родитель
Сommit
b1a243e166
2 измененных файлов с 135 добавлено и 40 удалено
  1. 123 38
      src/modules/kazoo/kz_amqp.c
  2. 12 2
      src/modules/kazoo/kz_amqp.h

+ 123 - 38
src/modules/kazoo/kz_amqp.c

@@ -741,6 +741,9 @@ void kz_amqp_connection_close(kz_amqp_conn_ptr rmq) {
     if (!rmq)
     	return;
 
+    if(rmq->heartbeat)
+        kz_amqp_timer_destroy(&rmq->heartbeat);
+
     kz_amqp_fire_connection_event("closed", rmq->server->connection->info.host);
 
     if (rmq->conn) {
@@ -2273,7 +2276,13 @@ int kz_amqp_connect(kz_amqp_conn_ptr rmq)
 			}
     	}
 
-    return 0;
+	if(dbk_use_hearbeats > 0) {
+		if(kz_amqp_timer_create(&rmq->heartbeat, dbk_use_hearbeats, kz_amqp_heartbeat_proc, rmq) != 0) {
+			LM_ERR("could not schedule heartbeats for the connection\n");
+		}
+	}
+
+	return 0;
 
 error:
 	kz_amqp_handle_server_failure(rmq);
@@ -2291,53 +2300,21 @@ void kz_amqp_reconnect_cb(int fd, short event, void *arg)
 		return;
 	}
 
-	if (connection->ev != NULL) {
-		event_del(connection->ev);
-		pkg_free(connection->ev);
-		connection->ev = NULL;
-	}
-
-	close(fd);
-	pkg_free(connection->timer);
-
+	kz_amqp_timer_destroy(&connection->reconnect);
 	kz_amqp_connect(connection);
 }
 
 int kz_amqp_handle_server_failure(kz_amqp_conn_ptr connection)
 {
+	int res = 0;
 
 	if(connection->state != KZ_AMQP_CONNECTION_CLOSED)
 		connection->state = KZ_AMQP_CONNECTION_FAILURE;
 
-	int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
-
-	if (timerfd == -1) {
-		LM_ERR("could not create timerfd to reschedule connection. No further attempts will be made to reconnect this server.\n");
-		return -1;
+	if((res = kz_amqp_timer_create(&connection->reconnect, 5, kz_amqp_reconnect_cb, connection)) != 0) {
+		LM_ERR("could not reschedule connection. No further attempts will be made to reconnect this server.\n");
 	}
-
-	struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec));
-	itime->it_interval.tv_sec = 0;
-	itime->it_interval.tv_nsec = 0;
-
-	itime->it_value.tv_sec = 5;
-	itime->it_value.tv_nsec = 0;
-
-	if (timerfd_settime(timerfd, 0, itime, NULL) == -1)
-	{
-		LM_ERR("could not set timer to reschedule connection. No further attempts will be made to reconnect this server.\n");
-		return -1;
-	}
-	LM_DBG("timerfd value is %d\n", timerfd);
-	struct event *timer_ev = pkg_malloc(sizeof(struct event));
-	event_set(timer_ev, timerfd, EV_READ, kz_amqp_reconnect_cb, connection);
-	if(event_add(timer_ev, NULL) == -1) {
-		LM_ERR("event_add failed while rescheduling connection (%s). No further attempts will be made to reconnect this server.\n", strerror(errno));
-		return -1;
-	}
-	connection->ev = timer_ev;
-	connection->timer = itime;
-	return 0;
+	return res;
 }
 
 int kz_amqp_publisher_send(kz_amqp_cmd_ptr cmd)
@@ -2847,3 +2824,111 @@ int kz_amqp_consumer_worker_proc(int cmd_pipe)
 	return 0;
 }
 
+void kz_amqp_timer_destroy(kz_amqp_timer_ptr* pTimer)
+{
+	kz_amqp_timer_ptr timer = *pTimer;
+	if (timer->ev != NULL) {
+		event_del(timer->ev);
+		pkg_free(timer->ev);
+		timer->ev = NULL;
+	}
+	close(timer->fd);
+	pkg_free(timer->timer);
+	pkg_free(timer);
+	*pTimer = NULL;
+}
+
+int kz_amqp_timer_create(kz_amqp_timer_ptr* pTimer, int seconds, void (*callback)(int, short, void *), void *data)
+{
+	kz_amqp_timer_ptr timer = NULL;
+	struct itimerspec *itime = NULL;
+	struct event *timer_ev = NULL;
+	int timerfd = 0;
+
+	timer = (kz_amqp_timer_ptr) pkg_malloc(sizeof(kz_amqp_timer));
+	if (!timer) {
+		LM_ERR("could not allocate timer struct.\n");
+		goto error;
+	}
+	memset(timer, 0, sizeof(kz_amqp_timer));
+
+	timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
+	if (timerfd == -1) {
+		LM_ERR("could not create timer.\n");
+		goto error;
+	}
+
+	itime = pkg_malloc(sizeof(struct itimerspec));
+	if (!itime) {
+		LM_ERR("could not allocate itimerspec struct.\n");
+		goto error;
+	}
+	itime->it_interval.tv_sec = 0;
+	itime->it_interval.tv_nsec = 0;
+	itime->it_value.tv_sec = seconds;
+	itime->it_value.tv_nsec = 0;
+
+	if (timerfd_settime(timerfd, 0, itime, NULL) == -1) {
+		LM_ERR("could not set timer for %i seconds in %i\n", seconds, timerfd);
+		goto error;
+	}
+
+	LM_DBG("timerfd value is %d\n", timerfd);
+	timer_ev = pkg_malloc(sizeof(struct event));
+	if (!timer_ev) {
+		LM_ERR("could not allocate event struct.\n");
+		goto error;
+	}
+	event_set(timer_ev, timerfd, EV_READ | EV_PERSIST, callback, data);
+	if (event_add(timer_ev, NULL) == -1) {
+		LM_ERR("event_add failed while creating timer (%s).\n", strerror(errno));
+		goto error;
+	}
+
+	timer->ev = timer_ev;
+	timer->timer = itime;
+	timer->fd = timerfd;
+	*pTimer = timer;
+
+	return 0;
+
+error: 
+
+	if (timer_ev)
+		pkg_free(timer_ev);
+
+	if (itime)
+		pkg_free(itime);
+
+	if (timerfd > 0)
+		close(timerfd);
+
+	if (timer)
+		pkg_free(timer);
+
+	*pTimer = NULL;
+
+	return -1;
+}
+
+void kz_amqp_heartbeat_proc(int fd, short event, void *arg)
+{
+	int res;
+	amqp_frame_t heartbeat;
+	kz_amqp_conn_ptr connection = (kz_amqp_conn_ptr) arg;
+	LM_DBG("sending heartbeat to zone : %s , connection id : %d\n", connection->server->zone->zone, connection->server->id);
+	if (connection->state != KZ_AMQP_CONNECTION_OPEN) {
+		kz_amqp_timer_destroy(&connection->heartbeat);
+		return;
+	}
+	heartbeat.channel = 0;
+	heartbeat.frame_type = AMQP_FRAME_HEARTBEAT;
+	res = amqp_send_frame(connection->conn, &heartbeat);
+	if (res != AMQP_STATUS_OK) {
+		LM_ERR("error sending heartbeat to zone : %s , connection id : %d\n", connection->server->zone->zone, connection->server->id);
+		kz_amqp_timer_destroy(&connection->heartbeat);
+		kz_amqp_handle_server_failure(connection);
+		return;
+	}
+	timerfd_settime(connection->heartbeat->fd, 0, connection->heartbeat->timer, NULL);
+}

+ 12 - 2
src/modules/kazoo/kz_amqp.h

@@ -82,12 +82,18 @@ typedef struct kz_amqp_connection_t {
 	char* url;
 } kz_amqp_connection, *kz_amqp_connection_ptr;
 
+typedef struct kz_amqp_timer_t {
+	struct event *ev;
+	struct itimerspec *timer;
+	int    fd;
+} kz_amqp_timer, *kz_amqp_timer_ptr;
+
 typedef struct kz_amqp_conn_t {
 	struct kz_amqp_server_t* server;
 	amqp_connection_state_t conn;
 	kz_amqp_connection_state state;
-	struct event *ev;
-	struct itimerspec *timer;
+	kz_amqp_timer_ptr reconnect;
+	kz_amqp_timer_ptr heartbeat;
 	amqp_socket_t *socket;
 	amqp_channel_t channel_count;
 	amqp_channel_t channel_counter;
@@ -269,6 +275,10 @@ void kz_amqp_fire_connection_event(char *event, char* host);
 
 void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd);
 
+void kz_amqp_timer_destroy(kz_amqp_timer_ptr* pTimer);
+int kz_amqp_timer_create(kz_amqp_timer_ptr* pTimer, int seconds, void (*callback)(int, short, void *), void *data);
+void kz_amqp_heartbeat_proc(int fd, short event, void *arg);
+
 static inline int kz_amqp_error(char const *context, amqp_rpc_reply_t x)
 {
 	amqp_connection_close_t *mconn;