Pārlūkot izejas kodu

erlang: fix function call in event route

- allow call functions for RPC and message send in event route
- detect RPC loop
- function/api call is possible in event route if event route is not
triggered by handling REX call (avoid deadlock)
Seudin Kasumovic 9 gadi atpakaļ
vecāks
revīzija
b0c04dd2ef

+ 87 - 1
modules/erlang/erl_api.c

@@ -83,6 +83,48 @@ int _impl_api_rpc_call(ei_x_buff *reply, const str *module,const str *function,
 	int buffsz=0;
 	int rc;
 
+	if (!csockfd) {
+
+		if (!enode) {
+			LM_NOTICE("there is no connected Erlang node\n");
+			/* reply up with error */
+			ei_x_format(reply, "{error,cnode,~a}", "no_erlang_node");
+			return -1;
+		}
+
+		if (rex_call_in_progress) {
+			LM_ERR("RPC loop detected\n");
+			ei_x_format(reply, "{badrpc,cnode,~a}", "rpc_loop_detected");
+			return -1;
+		}
+
+		/* do RPC from event route */
+		if (ei_rpc(&enode->ec,enode->sockfd,module->s,function->s,args->buff,args->index,reply) == ERL_ERROR)
+		{
+			reply->index = 0; /* re-use reply buffer */
+
+			if (erl_errno)
+			{
+				ei_x_format(reply, "{error,cnode,~s}", strerror(erl_errno));
+				LM_ERR("ei_rpc failed on node=<%s> socket=<%d>: %s\n",enode->conn.nodename,enode->sockfd,strerror(erl_errno));
+			}
+			else if (errno)
+			{
+				ei_x_format(reply, "{error,cnode,~s}", strerror(errno));
+				LM_ERR("ei_rpc failed on node=<%s> socket=<%d>: %s\n",enode->conn.nodename,enode->sockfd,strerror(errno));
+			}
+			else
+			{
+				ei_x_format(reply, "{error,cnode,~s}", "Unknown error.");
+				LM_ERR("ei_rpc failed on node=<%s> socket=<%d>, Unknown error.\n",enode->ec.thisalivename,enode->sockfd);
+			}
+			return -1;
+		}
+		/* reset response */
+		enode->response.index = 0;
+		return 0;
+	}
+
 	memset(&msgh, 0, sizeof(msgh));
 	memset(&cnt, 0, sizeof(cnt));
 
@@ -187,6 +229,39 @@ int _impl_reg_send(const str *server, const ei_x_buff *msg)
 		return -1;
 	}
 
+	if (!csockfd) {
+
+		if (!enode) {
+			LM_NOTICE("there is no connected Erlang node\n");
+			return -1;
+		}
+
+		if (rex_call_in_progress) {
+			LM_ERR("RPC in progress detected\n");
+			return -1;
+		}
+
+		/* do ERL_REG_SEND in event route */
+		if ((rc = ei_reg_send(&enode->ec,enode->sockfd,server->s,msg->buff,msg->buffsz)) == ERL_ERROR)
+		{
+			if (erl_errno)
+			{
+				LM_ERR("ei_reg_send failed on node=<%s> socket=<%d>: %s\n",enode->conn.nodename,enode->sockfd,strerror(erl_errno));
+			}
+			else if (errno)
+			{
+				LM_ERR("ei_reg_send failed on node=<%s> socket=<%d>: %s\n",enode->conn.nodename,enode->sockfd,strerror(errno));
+			}
+			else
+			{
+				LM_ERR("ei_reg_send failed on node=<%s> socket=<%d>, Unknown error.\n",enode->ec.thisalivename,enode->sockfd);
+			}
+		}
+		/* reset response */
+		enode->response.index = 0;
+		return 0;
+	}
+
 	/* Kamailio PID */
 	cnt[0].iov_base = (void*)&pid_no;
 	cnt[0].iov_len  = sizeof(pid_no);
@@ -245,6 +320,12 @@ int _impl_reply(const ei_x_buff *msg)
 		LM_ERR("not connected\n");
 		return -1;
 	}
+
+	if (rex_call_in_progress) {
+		LM_ERR("RPC in progress detected\n");
+		return -1;
+	}
+
 	/* copy into reply */
 	if (enode->response.buffsz < msg->buffsz) {
 		/* realocate */
@@ -277,11 +358,16 @@ int _impl_send(const erlang_pid *pid, const ei_x_buff *msg)
 		return -1;
 	}
 
+	if (rex_call_in_progress) {
+		LM_ERR("RPC in progress detected\n");
+		return -1;
+	}
+
 	if (enode) {
 
 		/* copy into reply */
 		if (enode->response.buffsz < msg->buffsz) {
-			/* realocate */
+			/* reallocate */
 			enode->response.buff=realloc(enode->response.buff,msg->buffsz);
 			if (!enode->response.buff) {
 				LM_ERR("realloc failed: not enough memory\n");

+ 6 - 8
modules/erlang/handle_emsg.c

@@ -571,7 +571,9 @@ int handle_rex_call(cnode_handler_t *phandler,erlang_ref_ex_t *ref, erlang_pid *
 	ctx.size = arity;
 
 	/* call rpc */
+	rex_call_in_progress = 1;
 	exp->function(&erl_rpc_func_param,(void*)&ctx);
+	rex_call_in_progress = 0;
 
 	if (ctx.no_params)
 	{
@@ -813,19 +815,15 @@ int handle_erlang_msg(cnode_handler_t *phandler, erlang_msg * msg)
 			LM_ERR("ei_send failed on node=<%s> socket=<%d>, %s\n",
 					phandler->ec.thisnodename,phandler->sockfd, strerror(erl_errno));
 		}
-
-		/* reset pid */
-		cnode_reply_to_pid = NULL;
-		return ret;
 	}
 	else
 	{
 		LM_DBG("** no reply **\n");
-
-		/* reset pid */
-		cnode_reply_to_pid = NULL;
-		return 0;
 	}
+
+	/* reset pid */
+	cnode_reply_to_pid = NULL;
+	return 0;
 }
 
 void encode_error_msg(ei_x_buff *response, erlang_ref_ex_t *ref, const char *type, const char *msg )

+ 2 - 0
modules/erlang/handle_emsg.h

@@ -27,6 +27,8 @@
 #include "erl_helpers.h"
 #include "cnode.h"
 
+extern int rex_in_progress;
+
 int handle_erlang_msg(cnode_handler_t *handler, erlang_msg * msg);
 
 #endif /* HANDLE_EMSG_H_ */

+ 2 - 0
modules/erlang/mod_erlang.c

@@ -88,6 +88,8 @@ int rpc_reply_with_struct = 0;
 str erlang_nodename  = STR_NULL;
 str erlang_node_sname = STR_NULL;
 
+int rex_call_in_progress = 0;
+
 int *usocks[2];
 int csockfd;
 

+ 2 - 0
modules/erlang/mod_erlang.h

@@ -39,6 +39,8 @@ extern int rpc_reply_with_struct;
 extern str erlang_nodename;
 extern str erlang_node_sname;
 
+extern int rex_call_in_progress;
+
 /* sockets kamailio <-> cnode */
 extern int *usocks[2];