Procházet zdrojové kódy

Merge branch 'seudin/erl_send'

Seudin Kasumovic před 10 roky
rodič
revize
e15e6d8cfd

+ 75 - 24
modules/erlang/README

@@ -47,7 +47,8 @@ Seudin Kasumovic
 
 
               5.1. erl_rpc(mod,fun,args,reply)
               5.1. erl_rpc(mod,fun,args,reply)
               5.2. erl_reg_send(server,msg)
               5.2. erl_reg_send(server,msg)
-              5.3. erl_reply(msg)
+              5.3. erl_send(pid,msg)
+              5.4. erl_reply(msg)
 
 
         6. Event routes
         6. Event routes
 
 
@@ -65,9 +66,10 @@ Seudin Kasumovic
               1.1. erl_load_api(erl_api)
               1.1. erl_load_api(erl_api)
               1.2. rpc(reply,module,function,args)
               1.2. rpc(reply,module,function,args)
               1.3. reg_send(server,msg)
               1.3. reg_send(server,msg)
-              1.4. reply(msg)
-              1.5. xavp2xbuff(xbuff,xavp)
-              1.6. xbuff2xavp(xavp,xbuff)
+              1.4. send(pid,msg)
+              1.5. reply(msg)
+              1.6. xavp2xbuff(xbuff,xavp)
+              1.7. xbuff2xavp(xavp,xbuff)
 
 
    List of Examples
    List of Examples
 
 
@@ -86,9 +88,10 @@ Seudin Kasumovic
    1.13. Example of using xbuff
    1.13. Example of using xbuff
    1.14. Example of using erl_rpc
    1.14. Example of using erl_rpc
    1.15. Example of using erl_reg_send
    1.15. Example of using erl_reg_send
-   1.16. Example of use erl_reply
-   1.17. Example of registered process
-   1.18. Example of using default event route
+   1.16. Example of using erl_send
+   1.17. Example of use erl_reply
+   1.18. Example of registered process
+   1.19. Example of using default event route
    2.1. Example of RPC call from erlang shell with no response
    2.1. Example of RPC call from erlang shell with no response
    2.2. Example, check is line registered
    2.2. Example, check is line registered
    2.3. Example get config variable
    2.3. Example get config variable
@@ -129,7 +132,8 @@ Chapter 1. Admin Guide
 
 
         5.1. erl_rpc(mod,fun,args,reply)
         5.1. erl_rpc(mod,fun,args,reply)
         5.2. erl_reg_send(server,msg)
         5.2. erl_reg_send(server,msg)
-        5.3. erl_reply(msg)
+        5.3. erl_send(pid,msg)
+        5.4. erl_reply(msg)
 
 
    6. Event routes
    6. Event routes
 
 
@@ -455,7 +459,8 @@ DEBUG: <script>: 410:typeof(X): tuple, length(X): 2, format(X): {line, [{id, 23}
 
 
    5.1. erl_rpc(mod,fun,args,reply)
    5.1. erl_rpc(mod,fun,args,reply)
    5.2. erl_reg_send(server,msg)
    5.2. erl_reg_send(server,msg)
-   5.3. erl_reply(msg)
+   5.3. erl_send(pid,msg)
+   5.4. erl_reply(msg)
 
 
 5.1.  erl_rpc(mod,fun,args,reply)
 5.1.  erl_rpc(mod,fun,args,reply)
 
 
@@ -518,12 +523,38 @@ $tuple(M) = $atom(example);
 erl_reg_send("notifier","$tuple(M)");
 erl_reg_send("notifier","$tuple(M)");
 ...
 ...
 
 
-5.3. erl_reply(msg)
+5.3. erl_send(pid,msg)
+
+   This function sends an Erlang term to a process. This function can be
+   used from ANY_ROUTE. The argument pid is the Erlang process id of the
+   intended recipient process on remote node. The argument msg is
+   containing the message to be sent.
+
+   Example 1.16. Example of using erl_send
+...
+# example of send message to process
+# Pid ! {example,message}
+
+$atom(notifier) = "notifier";
+$list(args) = $atom(notifier);
+
+erl_rpc("erlang", "whereis", "$list(args)", "$xbuff(pid)");
+
+$atom(example) = "example";
+$atom(message) = "message";
+
+$tuple(M) = $atom(message);
+$tuple(M) = $atom(example);
+
+erl_send("$xbuff(pid)","$tuple(M)");
+...
+
+5.4. erl_reply(msg)
 
 
    Function to send message from event route (pseudo process). Function
    Function to send message from event route (pseudo process). Function
    sends reply message msg to the sender process.
    sends reply message msg to the sender process.
 
 
-   Example 1.16. Example of use erl_reply
+   Example 1.17. Example of use erl_reply
 ...
 ...
 # event route acts as registered process
 # event route acts as registered process
 event_route[erlang:greetings] {
 event_route[erlang:greetings] {
@@ -569,7 +600,7 @@ INFO: <script>: 951:Received message: {"hello", "Kamailio"}
    event route in form of event_route[erlang:<my_process_name>]. Where
    event route in form of event_route[erlang:<my_process_name>]. Where
    <my_process_name> is the name of pseudo process.
    <my_process_name> is the name of pseudo process.
 
 
-   Example 1.17. Example of registered process
+   Example 1.18. Example of registered process
 ...
 ...
 # event route acts as registered process
 # event route acts as registered process
 event_route[erlang:handler] {
 event_route[erlang:handler] {
@@ -593,13 +624,17 @@ INFO: <script>: 951:Received message: {"hello", "Kamailio"}
    The reserved pseudo process name to receive messages sent to Kamailio C
    The reserved pseudo process name to receive messages sent to Kamailio C
    node. The message are sent to non registered process.
    node. The message are sent to non registered process.
 
 
-   Example 1.18. Example of using default event route
+   Example 1.19. Example of using default event route
 ...
 ...
 # default event route from erlang
 # default event route from erlang
 event_route[erlang:self] {
 event_route[erlang:self] {
 
 
         xlogl("L_INFO","Received message: $xbuff(msg=>format)\n");
         xlogl("L_INFO","Received message: $xbuff(msg=>format)\n");
 
 
+        if(pv_isset("$xbuff(msg[1])") && $xbuff(msg[1]=>type) == "pid") {
+                xlogl("L_INFO","Echo reply to: $xbuff(msg[1]=>format)\n");
+                erl_send("$xbuff(msg[1])","$xbuff(msg[0])");
+        }
 }
 }
 ...
 ...
 
 
@@ -609,9 +644,13 @@ event_route[erlang:self] {
 <14808.9.0>
 <14808.9.0>
 ([email protected])14> Pid ! ["hello from",self()].
 ([email protected])14> Pid ! ["hello from",self()].
 ["hello from",<0.247.0>]
 ["hello from",<0.247.0>]
+([email protected])15> flush().
+Shell got "hello from"
+ok
 
 
-> logged info message:
-INFO: <script>: 957:Received message: ["hello from", "<[email protected]>"]
+> logged info messages:
+INFO: <script>: 653:Received message: ["hello from", <[email protected]>]
+INFO: <script>: 656:Echo reply to: <[email protected]>]
 >
 >
 
 
 Chapter 2. Using Kamailio from Erlang
 Chapter 2. Using Kamailio from Erlang
@@ -683,18 +722,20 @@ Chapter 3. Developer Guide
         1.1. erl_load_api(erl_api)
         1.1. erl_load_api(erl_api)
         1.2. rpc(reply,module,function,args)
         1.2. rpc(reply,module,function,args)
         1.3. reg_send(server,msg)
         1.3. reg_send(server,msg)
-        1.4. reply(msg)
-        1.5. xavp2xbuff(xbuff,xavp)
-        1.6. xbuff2xavp(xavp,xbuff)
+        1.4. send(pid,msg)
+        1.5. reply(msg)
+        1.6. xavp2xbuff(xbuff,xavp)
+        1.7. xbuff2xavp(xavp,xbuff)
 
 
 1. Available Functions
 1. Available Functions
 
 
    1.1. erl_load_api(erl_api)
    1.1. erl_load_api(erl_api)
    1.2. rpc(reply,module,function,args)
    1.2. rpc(reply,module,function,args)
    1.3. reg_send(server,msg)
    1.3. reg_send(server,msg)
-   1.4. reply(msg)
-   1.5. xavp2xbuff(xbuff,xavp)
-   1.6. xbuff2xavp(xavp,xbuff)
+   1.4. send(pid,msg)
+   1.5. reply(msg)
+   1.6. xavp2xbuff(xbuff,xavp)
+   1.7. xbuff2xavp(xavp,xbuff)
 
 
 1.1.  erl_load_api(erl_api)
 1.1.  erl_load_api(erl_api)
 
 
@@ -726,14 +767,24 @@ Chapter 3. Developer Guide
      * ei_x_buff *msg - dynamic ei buffer with encoded erlang term. The
      * ei_x_buff *msg - dynamic ei buffer with encoded erlang term. The
        msg must be encoded with version byte.
        msg must be encoded with version byte.
 
 
-1.4.  reply(msg)
+1.4.  send(pid,msg)
+
+   This function sends an Erlang term to a process. On success return 0.
+
+   Meaning of parameters is as follows:
+     * erlang_pid *pid - pid of the intended recipient process on remote
+       node.
+     * ei_x_buff *msg - dynamic ei buffer with encoded erlang term. The
+       msg must be encoded with version byte.
+
+1.5.  reply(msg)
 
 
    Function to send reply on processed message.
    Function to send reply on processed message.
 
 
    Meaning of parameters is as follows:
    Meaning of parameters is as follows:
      * ei_x_buff *msg - dynamic ei buffer with encoded erlang term.
      * ei_x_buff *msg - dynamic ei buffer with encoded erlang term.
 
 
-1.5.  xavp2xbuff(xbuff,xavp)
+1.6.  xavp2xbuff(xbuff,xavp)
 
 
    Function encodes XAVP variable into ei dynamic buffer. How to create
    Function encodes XAVP variable into ei dynamic buffer. How to create
    XAVP variable see source code.
    XAVP variable see source code.
@@ -743,7 +794,7 @@ Chapter 3. Developer Guide
        encoded.
        encoded.
      * sr_xavp_t *xavp - XAVP variable to be encoded.
      * sr_xavp_t *xavp - XAVP variable to be encoded.
 
 
-1.6.  xbuff2xavp(xavp,xbuff)
+1.7.  xbuff2xavp(xavp,xbuff)
 
 
    Function decodes ei dynamic buffer into XAVP variable.
    Function decodes ei dynamic buffer into XAVP variable.
 
 

+ 1 - 0
modules/erlang/cnode.c

@@ -46,6 +46,7 @@ static io_wait_h io_h;
 
 
 cnode_handler_t *enode = NULL;
 cnode_handler_t *enode = NULL;
 csockfd_handler_t *csocket_handler = NULL;
 csockfd_handler_t *csocket_handler = NULL;
+erlang_pid *cnode_reply_to_pid = NULL;
 
 
 /**
 /**
  * @brief Initialize Kamailo as C node by active connect as client.
  * @brief Initialize Kamailo as C node by active connect as client.

+ 3 - 0
modules/erlang/cnode.h

@@ -69,6 +69,9 @@ typedef struct cnode_handler_s
  */
  */
 extern cnode_handler_t *enode;
 extern cnode_handler_t *enode;
 
 
+/* where to reply - reset to NULL after use */
+extern erlang_pid *cnode_reply_to_pid;
+
 typedef struct csockfd_handler_s
 typedef struct csockfd_handler_s
 {
 {
 	/* d-linked list  */
 	/* d-linked list  */

+ 42 - 2
modules/erlang/doc/erlang_admin.xml

@@ -519,6 +519,38 @@ $tuple(M) = $atom(message);
 $tuple(M) = $atom(example);
 $tuple(M) = $atom(example);
 
 
 erl_reg_send("notifier","$tuple(M)");
 erl_reg_send("notifier","$tuple(M)");
+...
+				</programlisting>
+			</example>
+		</section>
+		<section id="erlang.f.erl_send">
+			<title><function moreinfo="none">erl_send(pid,msg)</function></title>
+			<para>
+				This function sends an Erlang term to a process. This function
+				can be used from ANY_ROUTE. The argument <emphasis>pid</emphasis>
+				is the Erlang process id of the intended recipient process on
+				remote node. The argument <emphasis>msg</emphasis> is containing
+				the message to be sent.
+			</para>
+			<example>
+				<title>Example of using erl_send</title>
+				<programlisting format="linespecific">
+...
+# example of send message to process
+# Pid ! {example,message}
+
+$atom(notifier) = "notifier";
+$list(args) = $atom(notifier);
+
+erl_rpc("erlang", "whereis", "$list(args)", "$xbuff(pid)");
+
+$atom(example) = "example";
+$atom(message) = "message";
+
+$tuple(M) = $atom(message);
+$tuple(M) = $atom(example);
+
+erl_send("$xbuff(pid)","$tuple(M)");
 ...
 ...
 				</programlisting>
 				</programlisting>
 			</example>
 			</example>
@@ -620,6 +652,10 @@ event_route[erlang:self] {
 
 
 	xlogl("L_INFO","Received message: $xbuff(msg=>format)\n");
 	xlogl("L_INFO","Received message: $xbuff(msg=>format)\n");
 
 
+	if(pv_isset("$xbuff(msg[1])") &amp;&amp; $xbuff(msg[1]=>type) == "pid") {
+		xlogl("L_INFO","Echo reply to: $xbuff(msg[1]=>format)\n");
+		erl_send("$xbuff(msg[1])","$xbuff(msg[0])");
+	}
 }
 }
 ...
 ...
 
 
@@ -629,9 +665,13 @@ event_route[erlang:self] {
 &lt;14808.9.0&gt;
 &lt;14808.9.0&gt;
 ([email protected])14> Pid ! ["hello from",self()].
 ([email protected])14> Pid ! ["hello from",self()].
 ["hello from",&lt;0.247.0&gt;]
 ["hello from",&lt;0.247.0&gt;]
+([email protected])15> flush().
+Shell got "hello from"
+ok
 
 
-&gt; logged info message:
-INFO: &lt;script&gt;: 957:Received message: ["hello from", "&lt;[email protected]>"]
+&gt; logged info messages:
+INFO: &lt;script&gt;: 653:Received message: ["hello from", &lt;[email protected]&gt;]
+INFO: &lt;script&gt;: 656:Echo reply to: &lt;[email protected]&gt;]
 &gt;
 &gt;
 				</programlisting>
 				</programlisting>
 			</example>
 			</example>

+ 19 - 0
modules/erlang/doc/erlang_devel.xml

@@ -70,6 +70,25 @@
 				</listitem>
 				</listitem>
 			</itemizedlist>
 			</itemizedlist>
 		</section>
 		</section>
+		<section>
+			<title>
+				<function moreinfo="none">send(pid,msg)</function>
+			</title>
+			<para>
+				This function sends an Erlang term to a process. On success return 0.
+			</para>
+			<para>Meaning of parameters is as follows:</para>
+			<itemizedlist mark="none">
+				<listitem>
+					<para><emphasis>erlang_pid *pid</emphasis> - pid of the intended
+					recipient process on remote node.</para>
+				</listitem>
+				<listitem>
+					<para><emphasis>ei_x_buff *msg</emphasis> - dynamic ei buffer
+					with encoded erlang term. The msg must be encoded with version byte.</para>
+				</listitem>
+			</itemizedlist>
+		</section>
 		<section>
 		<section>
 			<title>
 			<title>
 				<function moreinfo="none">reply(msg)</function>
 				<function moreinfo="none">reply(msg)</function>

+ 82 - 0
modules/erlang/erl_api.c

@@ -32,6 +32,7 @@
 
 
 int _impl_api_rpc_call(ei_x_buff* reply, const str *module,const str *function, const ei_x_buff *args);
 int _impl_api_rpc_call(ei_x_buff* reply, const str *module,const str *function, const ei_x_buff *args);
 int _impl_reg_send(const str *server, const ei_x_buff *msg);
 int _impl_reg_send(const str *server, const ei_x_buff *msg);
+int _impl_send(const erlang_pid *pid, const ei_x_buff *msg);
 int _impl_reply(const ei_x_buff *msg);
 int _impl_reply(const ei_x_buff *msg);
 int xavp2xbuff(ei_x_buff *xbuff, sr_xavp_t *xavp);
 int xavp2xbuff(ei_x_buff *xbuff, sr_xavp_t *xavp);
 int xbuff2xavp(sr_xavp_t **xavp, ei_x_buff *xbuff);
 int xbuff2xavp(sr_xavp_t **xavp, ei_x_buff *xbuff);
@@ -45,6 +46,7 @@ int load_erl( erl_api_t *erl_api )
 {
 {
 	erl_api->rpc = _impl_api_rpc_call;
 	erl_api->rpc = _impl_api_rpc_call;
 	erl_api->reg_send = _impl_reg_send;
 	erl_api->reg_send = _impl_reg_send;
+	erl_api->send = _impl_send;
 	erl_api->reply = _impl_reply;
 	erl_api->reply = _impl_reply;
 	erl_api->xavp2xbuff = xavp2xbuff;
 	erl_api->xavp2xbuff = xavp2xbuff;
 	erl_api->xbuff2xavp = xbuff2xavp;
 	erl_api->xbuff2xavp = xbuff2xavp;
@@ -259,3 +261,83 @@ int _impl_reply(const ei_x_buff *msg)
 
 
 	return 0;
 	return 0;
 }
 }
+
+int _impl_send(const erlang_pid *pid, const ei_x_buff *msg)
+{
+	struct msghdr msgh;
+	struct iovec cnt[6];
+	int pid_no = my_pid();
+	eapi_t api = API_SEND;
+	int buffsz;
+	int rc;
+	int i=0,version;
+
+	if (ei_decode_version(msg->buff,&i,&version)) {
+		LM_ERR("msg must be encoded with version\n");
+		return -1;
+	}
+
+	if (enode) {
+
+		/* copy into reply */
+		if (enode->response.buffsz < msg->buffsz) {
+			/* realocate */
+			enode->response.buff=realloc(enode->response.buff,msg->buffsz);
+			if (!enode->response.buff) {
+				LM_ERR("realloc failed: not enough memory\n");
+				return -1;
+			}
+			enode->response.buffsz = msg->buffsz;
+		}
+
+		memcpy((void*)enode->response.buff,(void*)msg->buff,msg->buffsz);
+		enode->response.index = msg->index;
+
+		/* address process */
+		cnode_reply_to_pid = (erlang_pid *)pid;
+		return 0;
+	} else if (csockfd) {
+
+		/* send via cnode */
+		memset(&msgh, 0, sizeof(msgh));
+		memset(&cnt, 0, sizeof(cnt));
+
+		/* Kamailio PID */
+		cnt[0].iov_base = (void*)&pid_no;
+		cnt[0].iov_len  = sizeof(pid_no);
+
+		/* method */
+		cnt[1].iov_base = (void*)&api;
+		cnt[1].iov_len = sizeof(api);
+
+		/* put size of following data */
+		buffsz = msg->index; /* occupied size */
+		cnt[2].iov_base = (void*)&buffsz;
+		cnt[2].iov_len = sizeof(buffsz);
+
+		/* module name */
+		cnt[3].iov_base = (void*)pid;
+		cnt[3].iov_len  = sizeof(erlang_pid);
+
+		/* Erlang arguments content */
+		cnt[4].iov_base = (void*)msg->buff;
+		cnt[4].iov_len = buffsz; /* occupied size */
+
+		msgh.msg_iov = cnt;
+		msgh.msg_iovlen = 5;
+
+		while ((rc = sendmsg(csockfd, &msgh, 0)) == -1 && errno == EAGAIN)
+			;
+
+		if (rc == -1) {
+			LM_ERR("sendmsg failed: %s\n",strerror(errno));
+			return -1;
+		}
+	} else {
+		LM_ERR("not connected\n");
+		return -1;
+	}
+
+	/* no reply */
+	return 0;
+}

+ 2 - 0
modules/erlang/erl_api.h

@@ -32,6 +32,7 @@
 typedef int (*erl_rpc_f)(ei_x_buff* reply, const str *module,const str *function, const ei_x_buff *args);
 typedef int (*erl_rpc_f)(ei_x_buff* reply, const str *module,const str *function, const ei_x_buff *args);
 typedef int (*erl_reg_send_f)(const str *server,const ei_x_buff *msg);
 typedef int (*erl_reg_send_f)(const str *server,const ei_x_buff *msg);
 typedef int (*erl_reply_f)(const ei_x_buff *msg);
 typedef int (*erl_reply_f)(const ei_x_buff *msg);
+typedef int (*erl_send_f)(const erlang_pid *pid,const ei_x_buff *msg);
 
 
 /* data serialization */
 /* data serialization */
 typedef int (*xavp2xbuff_f)(ei_x_buff *xbuff, sr_xavp_t *xavp);
 typedef int (*xavp2xbuff_f)(ei_x_buff *xbuff, sr_xavp_t *xavp);
@@ -40,6 +41,7 @@ typedef int (*xbuff2xavp_f)(sr_xavp_t **xavp, ei_x_buff *xbuff);
 typedef struct erl_api_s {
 typedef struct erl_api_s {
 	erl_rpc_f rpc;
 	erl_rpc_f rpc;
 	erl_reg_send_f reg_send;
 	erl_reg_send_f reg_send;
+	erl_send_f send;
 	erl_reply_f reply;
 	erl_reply_f reply;
 	xavp2xbuff_f xavp2xbuff;
 	xavp2xbuff_f xavp2xbuff;
 	xbuff2xavp_f xbuff2xavp;
 	xbuff2xavp_f xbuff2xavp;

+ 2 - 1
modules/erlang/erl_helpers.h

@@ -147,7 +147,8 @@ int ei_decode_strorbin(char *buf, int *index, int maxlen, char *dst);
 
 
 typedef enum {
 typedef enum {
 	API_RPC_CALL,
 	API_RPC_CALL,
-	API_REG_SEND
+	API_REG_SEND,
+	API_SEND
 } eapi_t;
 } eapi_t;
 
 
 #endif /* ERL_HELPERS_H_ */
 #endif /* ERL_HELPERS_H_ */

+ 17 - 5
modules/erlang/handle_emsg.c

@@ -141,6 +141,9 @@ int handle_reg_send(cnode_handler_t *phandler, erlang_msg * msg)
 	}
 	}
 	xpid->val.type = SR_XTYPE_XAVP;
 	xpid->val.type = SR_XTYPE_XAVP;
 
 
+	/* registered process reply to from */
+	cnode_reply_to_pid = &msg->from;
+
 	backup_rt = get_route_type();
 	backup_rt = get_route_type();
 	set_route_type(EVENT_ROUTE);
 	set_route_type(EVENT_ROUTE);
 	init_run_actions_ctx(&ctx);
 	init_run_actions_ctx(&ctx);
@@ -734,6 +737,8 @@ int handle_erlang_msg(cnode_handler_t *phandler, erlang_msg * msg)
 
 
 	if (msg->msgtype == ERL_REG_SEND )
 	if (msg->msgtype == ERL_REG_SEND )
 	{
 	{
+		cnode_reply_to_pid = &msg->from;
+
 		if (!strncmp(msg->toname, "net_kernel",MAXATOMLEN)) {
 		if (!strncmp(msg->toname, "net_kernel",MAXATOMLEN)) {
 			/* respond to ping stuff */
 			/* respond to ping stuff */
 			ret = handle_net_kernel(phandler, msg);
 			ret = handle_net_kernel(phandler, msg);
@@ -744,7 +749,8 @@ int handle_erlang_msg(cnode_handler_t *phandler, erlang_msg * msg)
 			/* try registered process */
 			/* try registered process */
 			handle_reg_send(phandler,msg);
 			handle_reg_send(phandler,msg);
 		}
 		}
-		from = msg->from;
+	} else if (msg->msgtype == ERL_SEND) {
+		ret = handle_send(phandler, msg);
 	} else {
 	} else {
 		/* TODO: fix below after adding #Pid and #Ref in PVs */
 		/* TODO: fix below after adding #Pid and #Ref in PVs */
 		request->index = 0;
 		request->index = 0;
@@ -785,24 +791,30 @@ int handle_erlang_msg(cnode_handler_t *phandler, erlang_msg * msg)
 
 
 	if (ret)
 	if (ret)
 	{
 	{
+		/* reset pid */
+		cnode_reply_to_pid = NULL;
 		return ret;
 		return ret;
 	}
 	}
-	else if (response->index > 1)
+	else if (response->index > 1 && cnode_reply_to_pid)
 	{
 	{
-		ei_x_print_msg(response, &from, 1);
+		ei_x_print_msg(response, cnode_reply_to_pid, 1);
 
 
-		if (ei_send(phandler->sockfd, &from, response->buff, response->index))
+		if (ei_send(phandler->sockfd, cnode_reply_to_pid, response->buff, response->index))
 		{
 		{
 			LM_ERR("ei_send failed on node=<%s> socket=<%d>, %s\n",
 			LM_ERR("ei_send failed on node=<%s> socket=<%d>, %s\n",
 					phandler->ec.thisnodename,phandler->sockfd, strerror(erl_errno));
 					phandler->ec.thisnodename,phandler->sockfd, strerror(erl_errno));
 		}
 		}
 
 
+		/* reset pid */
+		cnode_reply_to_pid = NULL;
 		return ret;
 		return ret;
-
 	}
 	}
 	else
 	else
 	{
 	{
 		LM_DBG("** no reply **\n");
 		LM_DBG("** no reply **\n");
+
+		/* reset pid */
+		cnode_reply_to_pid = NULL;
 		return 0;
 		return 0;
 	}
 	}
 }
 }

+ 303 - 0
modules/erlang/mod_erlang.c

@@ -62,10 +62,12 @@ static int postprocess_request(struct sip_msg *msg, unsigned int flags, void *_p
 /*  exported functions */
 /*  exported functions */
 static int erl_rpc(struct sip_msg *msg, char *module, char *function, char *args, char *reply);
 static int erl_rpc(struct sip_msg *msg, char *module, char *function, char *args, char *reply);
 static int erl_reg_send_k(struct sip_msg *msg, char *_server, char *_emsg);
 static int erl_reg_send_k(struct sip_msg *msg, char *_server, char *_emsg);
+static int erl_send_k(struct sip_msg *msg, char *_server, char *_emsg);
 static int erl_reply_k(struct sip_msg *msg, char *_emsg);
 static int erl_reply_k(struct sip_msg *msg, char *_emsg);
 
 
 /* fix-ups */
 /* fix-ups */
 static int fixup_rpc(void** param, int param_no);
 static int fixup_rpc(void** param, int param_no);
+static int fixup_send(void** param, int param_no);
 static int fixup_reg(void** param, int param_no);
 static int fixup_reg(void** param, int param_no);
 static int fixup_reply(void** param, int param_no);
 static int fixup_reply(void** param, int param_no);
 
 
@@ -164,6 +166,7 @@ static param_export_t parameters[] =
 static cmd_export_t commands[] =
 static cmd_export_t commands[] =
 {
 {
 		{"erl_rpc", (cmd_function)erl_rpc, 4, fixup_rpc, 0, ANY_ROUTE},
 		{"erl_rpc", (cmd_function)erl_rpc, 4, fixup_rpc, 0, ANY_ROUTE},
+		{"erl_send", (cmd_function)erl_send_k, 2, fixup_send, 0, ANY_ROUTE},
 		{"erl_reg_send", (cmd_function)erl_reg_send_k, 2, fixup_reg, 0, ANY_ROUTE},
 		{"erl_reg_send", (cmd_function)erl_reg_send_k, 2, fixup_reg, 0, ANY_ROUTE},
 		{"erl_reply", (cmd_function)erl_reply_k, 1, fixup_reply, 0, EVENT_ROUTE},
 		{"erl_reply", (cmd_function)erl_reply_k, 1, fixup_reply, 0, EVENT_ROUTE},
 		{"load_erl",(cmd_function)load_erl,0, 0,         0,         0}, /* API loader */
 		{"load_erl",(cmd_function)load_erl,0, 0,         0,         0}, /* API loader */
@@ -1076,3 +1079,303 @@ static int fixup_reply(void** param, int param_no)
 
 
 	return 0;
 	return 0;
 }
 }
+
+static int erl_send_k(struct sip_msg *msg, char *_pid, char *_emsg)
+{
+	erl_param_t *param_pid=(erl_param_t*)_pid;
+	erl_param_t *param_emsg=(erl_param_t*)_emsg;
+
+	str str_msg;
+	sr_xavp_t *xmsg=NULL;
+	pv_spec_t sp;
+	pv_spec_t *nsp = NULL;
+	pv_param_t  pvp;
+	pv_name_t *pvn;
+	pv_index_t *pvi;
+	int idx;
+	int idxf;
+	int attr;
+	ei_x_buff ei_msg;
+	erlang_pid *pid;
+
+	switch (param_pid->type) {
+	case ERL_PARAM_XBUFF_SPEC:
+		sp = param_pid->value.sp;
+		pvp = sp.pvp; /* work on copy */
+
+		if (pvp.pvn.type != PV_NAME_INTSTR || !(pvp.pvn.u.isname.type & AVP_NAME_STR)) {
+			LM_ERR("unsupported name of pid\n");
+			return -1;
+		}
+
+		if( pvp.pvn.type == PV_NAME_PVAR) {
+			nsp = pvp.pvn.u.dname;
+		}
+
+		if (nsp) {
+			pvi = &nsp->pvp.pvi;
+			pvn = &nsp->pvp.pvn;
+		} else {
+			pvi = &pvp.pvi;
+			pvn = &pvp.pvn;
+		}
+
+		if (sp.getf == pv_pid_get ) {
+			xmsg = pv_pid_get_pid(&pvn->u.isname.name.s);
+		} else if (sp.setf == pv_xbuff_set) {
+			xmsg = pv_xbuff_get_xbuff(&pvn->u.isname.name.s);
+		} else {
+			LM_ERR("BUG: unexpected type for pid parameter\n");
+			return -1;
+		}
+
+		/* fix index */
+		attr = xbuff_get_attr_flags(pvi->type);
+		pvi->type = xbuff_fix_index(pvi->type);
+
+		/* get the index */
+		if(pv_get_spec_index(msg, &pvp, &idx, &idxf))
+		{
+			LM_ERR("invalid index\n");
+			return -1;
+		}
+
+		if (xbuff_is_attr_set(attr)) {
+			LM_WARN("attribute is not expected here!\n");
+		}
+
+		if (!xmsg) {
+			LM_ERR("undefined variable '%.*s'\n",STR_FMT(&pvn->u.isname.name.s));
+			return -1;
+		}
+
+		xmsg = xmsg->val.v.xavp;
+
+		if ((idxf != PV_IDX_ALL) && !xbuff_is_no_index(attr) ) {
+			xmsg = xavp_get_nth(&xmsg->val.v.xavp,idx,NULL);
+		}
+
+		if (!xmsg) {
+			LM_ERR("undefined value in '%.*s' at index %d\n",STR_FMT(&pvn->u.isname.name.s),idx);
+			goto err;
+		}
+
+		/* erlang_pid <- XAVP */
+		if (xmsg->name.s[0] == 'p' && xmsg->val.type == SR_XTYPE_DATA && xmsg->val.v.data) {
+			pid = xmsg->val.v.data->p;
+		} else {
+			LM_ERR("invalid value for pid parameter\n");
+			return -1;
+		}
+		break;
+	default:
+		LM_ERR("unexpected type for pid parameter\n");
+		return -1;
+	}
+
+	ei_x_new_with_version(&ei_msg);
+
+	switch(param_emsg->type){
+	case ERL_PARAM_FPARAM:
+		if(get_str_fparam(&str_msg,msg,&param_emsg->value.fp)){
+			LM_ERR("can't get emsg parameter\n");
+			goto err;
+		}
+
+		ei_x_encode_string_len(&ei_msg,str_msg.s,str_msg.len);
+
+		break;
+	case ERL_PARAM_XBUFF_SPEC:
+		sp = param_emsg->value.sp;
+		pvp = sp.pvp; /* work on copy */
+
+		if (pvp.pvn.type != PV_NAME_INTSTR || !(pvp.pvn.u.isname.type & AVP_NAME_STR)) {
+			LM_ERR("unsupported name of list\n");
+			return -1;
+		}
+
+		if( pvp.pvn.type == PV_NAME_PVAR) {
+			nsp = pvp.pvn.u.dname;
+		}
+
+		if (nsp) {
+			pvi = &nsp->pvp.pvi;
+			pvn = &nsp->pvp.pvn;
+		} else {
+			pvi = &pvp.pvi;
+			pvn = &pvp.pvn;
+		}
+
+		if (sp.setf == pv_list_set ) {
+			xmsg = pv_list_get_list(&pvn->u.isname.name.s);
+		} else if (sp.setf == pv_xbuff_set) {
+			xmsg = pv_xbuff_get_xbuff(&pvn->u.isname.name.s);
+		}  else if (sp.setf == pv_tuple_set) {
+			xmsg = pv_tuple_get_tuple(&pvn->u.isname.name.s);
+		}
+
+		/* fix index */
+		attr = xbuff_get_attr_flags(pvi->type);
+		pvi->type = xbuff_fix_index(pvi->type);
+
+		/* get the index */
+		if(pv_get_spec_index(msg, &pvp, &idx, &idxf))
+		{
+			LM_ERR("invalid index\n");
+			return -1;
+		}
+
+		if (xbuff_is_attr_set(attr)) {
+			LM_WARN("attribute is not expected here!\n");
+		}
+
+		if (!xmsg) {
+			LM_ERR("undefined variable '%.*s'\n",STR_FMT(&pvn->u.isname.name.s));
+			return -1;
+		}
+
+		xmsg = xmsg->val.v.xavp;
+
+		if ((idxf != PV_IDX_ALL) && !xbuff_is_no_index(attr) ) {
+			xmsg = xavp_get_nth(&xmsg->val.v.xavp,idx,NULL);
+		}
+
+		if (!xmsg) {
+			LM_ERR("undefined value in '%.*s' at index %d\n",STR_FMT(&pvn->u.isname.name.s),idx);
+			goto err;
+		}
+
+		/* ei_x_buff <- XAVP */
+		if (erl_api.xavp2xbuff(&ei_msg,xmsg)) {
+			LM_ERR("failed to encode %.*s\n",STR_FMT(&pvn->u.isname.name.s));
+			goto err;
+		}
+
+		break;
+	default:
+		LM_ERR("unexpected type for emsg parameter\n");
+		return -1;
+	}
+
+	if (erl_api.send(pid,&ei_msg)) {
+		goto err;
+	}
+
+	ei_x_free(&ei_msg);
+
+	return 1;
+
+err:
+	ei_x_free(&ei_msg);
+
+	return -1;
+}
+
+static int fixup_send(void** param, int param_no)
+{
+	erl_param_t *erl_param;
+
+	str s;
+
+	erl_param=(erl_param_t*)pkg_malloc(sizeof(erl_param_t));
+
+	if(!erl_param) {
+		LM_ERR("no more memory\n");
+		return -1;
+	}
+
+	memset(erl_param,0,sizeof(erl_param_t));
+
+	if (param_no==1) {
+
+		s.s = (char*)*param; s.len = strlen(s.s);
+
+		if (pv_parse_avp_name(&erl_param->value.sp,&s)) {
+			LM_ERR("failed to parse parameter #%d\n",param_no);
+			pkg_free((void*)erl_param);
+			return E_UNSPEC;
+		}
+
+		if (erl_param->value.sp.pvp.pvn.type == PV_NAME_INTSTR) {
+			if (fix_param_types(FPARAM_STR|FPARAM_STRING,param)) {
+				LM_ERR("wrong parameter #%d\n",param_no);
+				pkg_free((void*)erl_param);
+				return E_UNSPEC;
+			}
+			LM_INFO("param emsg is PV_NAME_INTSTR\n");
+			erl_param->type = ERL_PARAM_FPARAM;
+			erl_param->value.fp = *(fparam_t*)*param;
+		} else if(pv_parse_spec( &s, &erl_param->value.sp)==NULL) {
+
+			/* only XBUFF is accepted for emsg */
+			LM_ERR("wrong parameter #%d\n",param_no);
+			pv_spec_free(&erl_param->value.sp);
+			pkg_free((void*)erl_param);
+			return E_UNSPEC;
+		} else {
+			if (erl_param->value.sp.type ==PVT_XAVP) {
+				LM_ERR("XAVP not acceptable for parameter #%d\n",param_no);
+				pkg_free((void*)erl_param);
+				return E_UNSPEC;
+			}
+
+			if (erl_param->value.sp.getf == pv_pid_get
+					|| erl_param->value.sp.getf == pv_xbuff_get) {
+				erl_param->type = ERL_PARAM_XBUFF_SPEC;
+			} else {
+				erl_param->type = ERL_PARAM_FPARAM;
+				erl_param->value.fp = *(fparam_t*)*param;
+			}
+		}
+	}
+
+	if (param_no==2) {
+
+		s.s = (char*)*param; s.len = strlen(s.s);
+
+		if (pv_parse_avp_name(&erl_param->value.sp,&s)) {
+			LM_ERR("failed to parse parameter #%d\n",param_no);
+			pkg_free((void*)erl_param);
+			return E_UNSPEC;
+		}
+
+		if (erl_param->value.sp.pvp.pvn.type == PV_NAME_INTSTR) {
+			if (fix_param_types(FPARAM_STR|FPARAM_STRING,param)) {
+				LM_ERR("wrong parameter #%d\n",param_no);
+				pkg_free((void*)erl_param);
+				return E_UNSPEC;
+			}
+			LM_INFO("param emsg is PV_NAME_INTSTR\n");
+			erl_param->type = ERL_PARAM_FPARAM;
+			erl_param->value.fp = *(fparam_t*)*param;
+		} else if(pv_parse_spec( &s, &erl_param->value.sp)==NULL) {
+
+			/* only XBUFF is accepted for emsg */
+			LM_ERR("wrong parameter #%d\n",param_no);
+			pv_spec_free(&erl_param->value.sp);
+			pkg_free((void*)erl_param);
+			return E_UNSPEC;
+		} else {
+			if (erl_param->value.sp.type ==PVT_XAVP) {
+				LM_ERR("XAVP not acceptable for parameter #%d\n",param_no);
+				pkg_free((void*)erl_param);
+				return E_UNSPEC;
+			}
+
+			if (erl_param->value.sp.setf == pv_list_set
+					|| erl_param->value.sp.setf == pv_xbuff_set
+					|| erl_param->value.sp.setf == pv_tuple_set
+					|| erl_param->value.sp.setf == pv_atom_set) {
+
+				erl_param->type = ERL_PARAM_XBUFF_SPEC;
+			} else {
+				erl_param->type = ERL_PARAM_FPARAM;
+				erl_param->value.fp = *(fparam_t*)*param;
+			}
+		}
+	}
+
+	*param = (void*)erl_param;
+
+	return 0;
+}

+ 5 - 0
modules/erlang/pv_pid.c

@@ -34,6 +34,11 @@ static str pid_list=str_init("[pids]");
 static char *pid_fmt_buff = NULL;
 static char *pid_fmt_buff = NULL;
 static int counter;
 static int counter;
 
 
+sr_xavp_t *pv_pid_get_pid(str *name)
+{
+	return xavp_get_child(&pid_list,name);
+}
+
 int pv_pid_parse_name(pv_spec_t *sp, str *in)
 int pv_pid_parse_name(pv_spec_t *sp, str *in)
 {
 {
 	char *p;
 	char *p;

+ 2 - 0
modules/erlang/pv_pid.h

@@ -30,6 +30,8 @@ int pv_pid_parse_name(pv_spec_t *sp, str *in);
 int pv_pid_set(struct sip_msg* msg,  pv_param_t* param, int op, pv_value_t* val);
 int pv_pid_set(struct sip_msg* msg,  pv_param_t* param, int op, pv_value_t* val);
 int pv_pid_get(struct sip_msg*, pv_param_t*, pv_value_t*);
 int pv_pid_get(struct sip_msg*, pv_param_t*, pv_value_t*);
 
 
+sr_xavp_t *pv_pid_get_pid(str *name);
+
 void free_pid_fmt_buff();
 void free_pid_fmt_buff();
 
 
 #endif /* PV_PID_H_ */
 #endif /* PV_PID_H_ */

+ 93 - 0
modules/erlang/worker.c

@@ -30,6 +30,7 @@
 
 
 int worker_rpc_impl(ei_cnode *ec, int s, int wpid);
 int worker_rpc_impl(ei_cnode *ec, int s, int wpid);
 int worker_reg_send_impl(ei_cnode *ec, int s, int wpid);
 int worker_reg_send_impl(ei_cnode *ec, int s, int wpid);
+int worker_send_impl(ei_cnode *ec, int s, int wpid);
 
 
 int worker_init(worker_handler_t *phandler, int fd, const ei_cnode *ec)
 int worker_init(worker_handler_t *phandler, int fd, const ei_cnode *ec)
 {
 {
@@ -89,6 +90,10 @@ int handle_worker(handler_common_t *phandler)
 		if (worker_reg_send_impl(&w->ec,w->sockfd,wpid))
 		if (worker_reg_send_impl(&w->ec,w->sockfd,wpid))
 			return -1;
 			return -1;
 		break;
 		break;
+	case API_SEND:
+		if (worker_send_impl(&w->ec,w->sockfd,wpid))
+			return -1;
+		break;
 	default:
 	default:
 		LM_ERR("BUG: bad method or not implemented!\n");
 		LM_ERR("BUG: bad method or not implemented!\n");
 		return 1;
 		return 1;
@@ -357,3 +362,91 @@ err:
 
 
 	return -1;
 	return -1;
 }
 }
+
+int worker_send_impl(ei_cnode *ec, int s,int wpid)
+{
+	erlang_pid pid;
+	ei_x_buff emsg;
+	struct msghdr msgh;
+	struct iovec cnt[6];
+	int rc;
+
+	memset((void*)&emsg,0,sizeof(emsg));
+
+	memset((void*)&msgh,0,sizeof(msgh));
+
+	/* Erlang args size */
+	cnt[0].iov_base = &emsg.buffsz;
+	cnt[0].iov_len  = sizeof(int);
+
+	/* get data size */
+	msgh.msg_iov    = cnt;
+	msgh.msg_iovlen = 1;
+
+	while ((rc = recvmsg(s, &msgh, MSG_PEEK)) == -1 && errno == EAGAIN)
+		;
+
+	if (rc == -1){
+		LM_ERR("recvmsg failed (socket=%d): %s\n",s,strerror(errno));
+		return -1;
+	}
+
+	emsg.buff = (char*)malloc(emsg.buffsz);
+	if (!emsg.buff) {
+		LM_ERR("malloc: not enough memory\n");
+		goto err;
+	}
+
+	/* buffers */
+	cnt[1].iov_base = &pid;
+	cnt[1].iov_len  = sizeof(erlang_pid);
+
+	cnt[2].iov_base = emsg.buff;
+	cnt[2].iov_len  = emsg.buffsz;
+
+	/* get whole data */
+	msgh.msg_iovlen = 3;
+	while ((rc = recvmsg(s, &msgh, MSG_WAITALL)) == -1 && errno == EAGAIN)
+		;
+
+	if (rc == -1){
+		LM_ERR("recvmsg failed (socket=%d): %s\n",s,strerror(errno));
+		goto err;
+	}
+
+	if(!enode) {
+		LM_NOTICE("there is no connected Erlang node\n");
+		goto err;
+	}
+
+	LM_DBG(">> <%s.%d.%d> ! emsg\n",pid.node,pid.num,pid.serial);
+
+	EI_X_BUFF_PRINT(&emsg);
+
+	/* do ERL_SEND */
+	if ((rc = ei_send(enode->sockfd,&pid,emsg.buff,emsg.buffsz)) == ERL_ERROR)
+	{
+		if (erl_errno)
+		{
+			LM_ERR("ei_send failed on node=<%s> socket=<%d>: %s\n",enode->conn.nodename,enode->sockfd,strerror(erl_errno));
+		}
+		else if (errno)
+		{
+			LM_ERR("ei_send failed on node=<%s> socket=<%d>: %s\n",enode->conn.nodename,enode->sockfd,strerror(errno));
+		}
+		else
+		{
+			LM_ERR("ei_send failed on node=<%s> socket=<%d>, Unknown error.\n",ec->thisalivename,enode->sockfd);
+		}
+	}
+
+	free(emsg.buff);
+
+	return 0;
+
+err:
+
+	free(emsg.buff);
+
+	return -1;
+}