Quellcode durchsuchen

Merge pull request #144 from kamailio/seudin/erlang_connect_fix

erlang: Reconnect after lost connection
Seudin Kasumovic vor 10 Jahren
Ursprung
Commit
a02299d514
3 geänderte Dateien mit 82 neuen und 7 gelöschten Zeilen
  1. 76 7
      modules/erlang/cnode.c
  2. 3 0
      modules/erlang/cnode.h
  3. 3 0
      modules/erlang/worker.c

+ 76 - 7
modules/erlang/cnode.c

@@ -45,6 +45,7 @@
 static io_wait_h io_h;
 
 cnode_handler_t *enode = NULL;
+csockfd_handler_t *csocket_handler = NULL;
 
 /**
  * @brief Initialize Kamailo as C node by active connect as client.
@@ -239,6 +240,8 @@ int init_cnode_sockets(int cnode_id)
 void cnode_main_loop(int cnode_id)
 {
 	char _cnode_name[MAXNODELEN];
+	str *enode_name;
+	int r;
 
 	if (snprintf(_cnode_name, MAXNODELEN, "%.*s%d@%.*s", STR_FMT(&cnode_alivename), cnode_id+1,STR_FMT(&cnode_host)) >= MAXNODELEN)
 	{
@@ -255,50 +258,73 @@ void cnode_main_loop(int cnode_id)
 		return;
 	}
 
+	enode_name = erlang_nodename.s?&erlang_nodename:&erlang_node_sname;
+
 	/* main loop */
 	switch(io_h.poll_method){
 		case POLL_POLL:
 			while(1){
-				io_wait_loop_poll(&io_h, IO_LISTEN_TIMEOUT, 0);
+				r = io_wait_loop_poll(&io_h, IO_LISTEN_TIMEOUT, 0);
+				if(!r && enode_connect()) {
+					LM_ERR("failed reconnect to %.*s\n",STR_FMT(enode_name));
+				}
 			}
 			break;
 #ifdef HAVE_SELECT
 		case POLL_SELECT:
 			while(1){
-				io_wait_loop_select(&io_h, IO_LISTEN_TIMEOUT, 0);
+				r = io_wait_loop_select(&io_h, IO_LISTEN_TIMEOUT, 0);
+				if(!r && enode_connect()) {
+					LM_ERR("failed reconnect to %.*s\n",STR_FMT(enode_name));
+				}
 			}
 			break;
 #endif
 #ifdef HAVE_SIGIO_RT
 		case POLL_SIGIO_RT:
 			while(1){
-				io_wait_loop_sigio_rt(&io_h, IO_LISTEN_TIMEOUT);
+				r = io_wait_loop_sigio_rt(&io_h, IO_LISTEN_TIMEOUT);
+				if(!r && enode_connect()) {
+					LM_ERR("failed reconnect to %.*s\n",STR_FMT(enode_name));
+				}
 			}
 			break;
 #endif
 #ifdef HAVE_EPOLL
 			case POLL_EPOLL_LT:
 				while(1){
-					io_wait_loop_epoll(&io_h, IO_LISTEN_TIMEOUT, 0);
+					r = io_wait_loop_epoll(&io_h, IO_LISTEN_TIMEOUT, 0);
+					if(!r && enode_connect()) {
+						LM_ERR("failed reconnect to %.*s\n",STR_FMT(enode_name));
+					}
 				}
 				break;
 			case POLL_EPOLL_ET:
 				while(1){
-					io_wait_loop_epoll(&io_h, IO_LISTEN_TIMEOUT, 1);
+					r = io_wait_loop_epoll(&io_h, IO_LISTEN_TIMEOUT, 1);
+					if(!r && enode_connect()) {
+						LM_ERR("failed reconnect to %.*s\n",STR_FMT(enode_name));
+					}
 				}
 				break;
 #endif
 #ifdef HAVE_KQUEUE
 			case POLL_KQUEUE:
 				while(1){
-					io_wait_loop_kqueue(&io_h, IO_LISTEN_TIMEOUT, 0);
+					r = io_wait_loop_kqueue(&io_h, IO_LISTEN_TIMEOUT, 0);
+					if(!r && enode_connect()) {
+						LM_ERR("failed reconnect to %.*s\n",STR_FMT(enode_name));
+					}
 				}
 				break;
 #endif
 #ifdef HAVE_DEVPOLL
                 case POLL_DEVPOLL:
                 	while(1){
-                		io_wait_loop_devpoll(&io_h, IO_LISTEN_TIMEOUT, 0);
+						r = io_wait_loop_devpoll(&io_h, IO_LISTEN_TIMEOUT, 0);
+						if(!r && enode_connect()) {
+							LM_ERR("failed reconnect to %.*s\n",STR_FMT(enode_name));
+						}
                 	}
                 	break;
 #endif
@@ -486,6 +512,8 @@ int csockfd_init(csockfd_handler_t *phandler, const ei_cnode *ec)
 
 	erl_set_nonblock(csockfd);
 
+	csocket_handler = phandler;
+
 	return 0;
 }
 
@@ -512,3 +540,44 @@ int handle_csockfd(handler_common_t *phandler_t)
 
 	return worker_init((worker_handler_t*)phandler->new,fd,&phandler_t->ec);
 }
+
+/*
+ * \brief Connect to Erlang node if not connected
+ */
+int enode_connect() {
+
+	handler_common_t *phandler;
+
+	if (!csocket_handler) {
+		return -1;
+	}
+
+	if (enode) {
+		return 0;
+	}
+
+	LM_DBG("not connected, trying to connect...\n");
+
+	phandler = (handler_common_t*)pkg_malloc(sizeof(cnode_handler_t));
+
+	if (!phandler) {
+		LM_CRIT("not enough memory\n");
+		return -1;
+	}
+
+	io_handler_ins(phandler);
+
+	/* connect to remote Erlang node */
+	if (cnode_connect_to((cnode_handler_t*)phandler,&csocket_handler->ec, erlang_nodename.s?&erlang_nodename:&erlang_node_sname)) {
+		/* continue even failed to connect, connection can be established
+		 * from Erlang side too */
+		io_handler_del(phandler);
+	} else if (io_watch_add(&io_h,phandler->sockfd,POLLIN,ERL_CNODE_H,phandler)){
+		LM_CRIT("io_watch_add failed\n");
+		erl_close_socket(phandler->sockfd);
+		io_handler_del(phandler);
+		return -1;
+	}
+
+	return 0;
+}

+ 3 - 0
modules/erlang/cnode.h

@@ -84,6 +84,8 @@ typedef struct csockfd_handler_s
 	ei_cnode ec; /* erlang C node (actually it's kamailio node) */
 } csockfd_handler_t;
 
+extern csockfd_handler_t *csocket_handler;
+
 int init_cnode_sockets(int idx);
 void cnode_main_loop(int idx);
 
@@ -95,6 +97,7 @@ int wait_cnode_tmo(handler_common_t *phandler_t);
 int destroy_cnode(handler_common_t *phandler_t);
 
 int cnode_connect_to(cnode_handler_t *phandler, ei_cnode *ec, const str *nodename );
+int enode_connect();
 
 enum erl_handle_type {
 	ERL_EPMD_H = 1,

+ 3 - 0
modules/erlang/worker.c

@@ -56,6 +56,9 @@ int handle_worker(handler_common_t *phandler)
 	eapi_t api;
 	int rc;
 
+	/* ensure be connected */
+	enode_connect();
+
 	memset((void*)&msg,0,sizeof(msg));
 
 	/* Kamailio worker PID */