Selaa lähdekoodia

Added jsonrpc-c module, providing a client interface to json-rpc services
over netstrings.

Matthew Williams 14 vuotta sitten
vanhempi
commit
044a6b9187

+ 10 - 0
modules/jsonrpc-c/.gitignore

@@ -0,0 +1,10 @@
+*.d
+*.o
+*.so
+*.lst
+*.tar.gz
+*.swp
+*.swo
+tags
+TAGS
+doc/*.txt

+ 19 - 0
modules/jsonrpc-c/Makefile

@@ -0,0 +1,19 @@
+#
+# jsonrpc module makefile
+#
+# 
+# WARNING: do not run this directly, it should be run by the master Makefile
+
+include ../../Makefile.defs
+auto_gen=
+NAME=jsonrpc-c.so
+LIBS=-lm -levent
+
+DEFS+=-I/usr/include/json -I$(LOCALBASE)/include/json \
+       -I$(LOCALBASE)/include
+LIBS+=-L$(SYSBASE)/include/lib -L$(LOCALBASE)/lib -ljson
+ 
+DEFS+=-DOPENSER_MOD_INTERFACE
+
+SERLIBPATH=../../lib
+include ../../Makefile.modules

+ 166 - 0
modules/jsonrpc-c/README

@@ -0,0 +1,166 @@
+jsonrpc-c (client) Module
+
+Matthew Williams
+
+   <[email protected]>
+
+Edited by
+
+Jordan Levy
+
+   <[email protected]>
+
+   Copyright © 2011 Flowroute LLC (flowroute.com)
+     __________________________________________________________________
+
+   Table of Contents
+
+   1. Admin Guide
+
+        1. Overview
+        2. Dependencies
+
+              2.1. Kamailio Modules
+              2.2. External Libraries or Applications
+
+        3. Exported Parameters
+
+              3.1. servers (string)
+
+        4. Exported Functions
+
+              4.1. jsonrpc_notification(method, parameters)
+              4.2. jsonrpc_request(method, parameters, return_route,
+                      error_route, result_var)
+
+   List of Examples
+
+   1.1. Set servers parameter
+   1.2. jsonrpc_notification usage
+   1.3. jsonrpc_request usage
+
+Chapter 1. Admin Guide
+
+   Table of Contents
+
+   1. Overview
+   2. Dependencies
+
+        2.1. Kamailio Modules
+        2.2. External Libraries or Applications
+
+   3. Exported Parameters
+
+        3.1. servers (string)
+
+   4. Exported Functions
+
+        4.1. jsonrpc_notification(method, parameters)
+        4.2. jsonrpc_request(method, parameters, return_route,
+                error_route, result_var)
+
+1. Overview
+
+   This module provides access to json-rpc services (operating over
+   TCP/Netstrings).
+
+   This module uses t_suspend() and t_continue() from the TM module.
+
+   Note that after invoking an asyncronous operation, the processing will
+   continue later, in another application process. Therefore, do not rely
+   on variables stored in private memory, use shared memory if you want to
+   get values after the processing is resumend (e.g., $shv(...) or htable
+   $sht(...)).
+
+2. Dependencies
+
+   2.1. Kamailio Modules
+   2.2. External Libraries or Applications
+
+2.1. Kamailio Modules
+
+   The following modules must be loaded before this module:
+     * tm - transaction management.
+
+2.2. External Libraries or Applications
+
+   The following libraries or applications must be installed before
+   running Kamailio with this module loaded:
+     * libjson - http://metaparadigm.com/json-c/
+
+3. Exported Parameters
+
+   3.1. servers (string)
+
+3.1. servers (string)
+
+   The servers providing the remote jsonrpc service. Format is
+   "host1:port1,priority1 host2:port2,priority2". Requests to servers of
+   the same priority will be distributed evenly (round robin). Server
+   groups with higher priority are used first.
+
+   Example 1.1. Set servers parameter
+...
+modparam("jsonrpc", "servers", "localhost:9999,2 10.10.0.1:9999,2 backup.server:
+9999,1")
+...
+
+4. Exported Functions
+
+   4.1. jsonrpc_notification(method, parameters)
+   4.2. jsonrpc_request(method, parameters, return_route, error_route,
+          result_var)
+
+4.1.  jsonrpc_notification(method, parameters)
+
+   Invokes the remote 'method' with the given 'parameters' as a
+   notification. Unlike jsonrpc_request (below), notifications do not
+   receive a response. Script processing continues in the usual fashion as
+   soon as the notification has been sent.
+
+   The method and parameters can be a static string or dynamic string
+   value with config variables.
+
+   Example 1.2. jsonrpc_notification usage
+...
+jsonrpc_notification("update_user", "{'id': 1234, 'name': 'Petros'}")
+...
+
+4.2.  jsonrpc_request(method, parameters, return_route, error_route,
+result_var)
+
+   Invokes the remote 'method' with the given 'parameters'. When the
+   response is received, continues processing of the SIP request with the
+   route[return_route]. If a timeout occurs, no servers can be reached, or
+   a jsonrpc error message is received, continues process at
+   route[error_route]. In this case, the result_var will contain one of
+   "timeout", "failure", or the error message received back from the
+   jsonrpc server.
+
+   The method, parameters, return_route, and error_route can be a static
+   string or a dynamic string value with config variables.
+
+   Since the SIP request handling is resumed in another process, the
+   config file execution is lost. As mentioned above, only shared
+   variables ($shv, etc) should be used for any value that will be needed
+   when the script is resumed.
+
+   The result is stored in the pseudo-variable 'result_var'. Since this
+   variable is set after the response is received, it is possible to use a
+   $var for this parameter.
+
+   Example 1.3. jsonrpc_request usage
+...
+jsonrpc_request("get_user", "{'id': 1234}", "RESPONSE", "ERROR", "$var(result)")
+;
+...
+route[RESPONSE] {
+        xlog("Result received: $var(result)");
+        ...
+}
+...
+route[ERROR] {
+        xlog("Error received: $var(result)");
+        ...
+}
+...

+ 1 - 0
modules/jsonrpc-c/TODO

@@ -0,0 +1 @@
+* "Reliable" Notifications (implement using requests, and failover on error; ignore non-error responses) ?

+ 4 - 0
modules/jsonrpc-c/doc/Makefile

@@ -0,0 +1,4 @@
+docs = jsonrpc.xml
+
+docbook_dir = ../../../docbook
+include $(docbook_dir)/Makefile.module

+ 37 - 0
modules/jsonrpc-c/doc/jsonrpc.xml

@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+
+<book xmlns:xi="http://www.w3.org/2001/XInclude">
+    <bookinfo>
+	<title>jsonrpc-c (client) Module</title>
+	<productname class="trade">sip-router.org</productname>
+	<authorgroup>
+	    <author>
+		<firstname>Matthew</firstname>
+		<surname>Williams</surname>
+		<email>[email protected]</email>
+	    </author>
+	    <editor>
+			<firstname>Jordan</firstname>
+			<surname>Levy</surname>
+			<email>[email protected]</email>
+	    </editor>
+	</authorgroup>
+	<copyright>
+	    <year>2011</year>
+	    <holder>Flowroute LLC (flowroute.com)</holder>
+	</copyright>
+    </bookinfo>
+    <toc></toc>
+    
+    <xi:include href="jsonrpc_admin.xml"/>
+    
+    
+</book>

+ 154 - 0
modules/jsonrpc-c/doc/jsonrpc_admin.xml

@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+<!-- Module User's Guide -->
+
+<chapter>
+
+	<title>&adminguide;</title>
+
+	<section>
+		<title>Overview</title>
+		<para>
+			This module provides access to json-rpc services (operating over TCP/Netstrings).
+		</para>
+		<para>
+			This module uses t_suspend() and t_continue() from the TM module.
+		</para>
+		<para>
+			Note that after invoking an asyncronous operation, the processing
+			will continue later, in another application process. Therefore, do not
+			rely on variables stored in private memory, use shared memory if you
+			want to get values after the processing is resumend (e.g., $shv(...)
+			or htable $sht(...)).
+		</para>
+	</section>
+
+	<section>
+		<title>Dependencies</title>
+		<section>
+			<title>&kamailio; Modules</title>
+			<para>
+				The following modules must be loaded before this module:
+				<itemizedlist>
+					<listitem>
+						<para>
+							<emphasis>tm</emphasis> - transaction management.
+						</para>
+					</listitem>
+				</itemizedlist>
+			</para>
+		</section>
+		<section>
+			<title>External Libraries or Applications</title>
+			<para>
+				The following libraries or applications must be installed before running
+				&kamailio; with this module loaded:
+				<itemizedlist>
+					<listitem>
+						<para>
+							<emphasis>libjson</emphasis> - http://metaparadigm.com/json-c/
+						</para>
+					</listitem>
+				</itemizedlist>
+			</para>
+		</section>
+	</section>
+	<section>
+		<title>Exported Parameters</title>
+		<section>
+			<title><varname>servers</varname> (string)</title>
+			<para>
+				The servers providing the remote jsonrpc service. Format is "host1:port1,priority1 host2:port2,priority2". Requests to servers of the same priority will be distributed evenly (round robin). Server groups with higher priority are used first. 
+			</para>
+			<example>
+				<title>Set <varname>servers</varname> parameter</title>
+				<programlisting format="linespecific">
+...
+modparam("jsonrpc", "servers", "localhost:9999,2 10.10.0.1:9999,2 backup.server:9999,1")
+...
+				</programlisting>
+			</example>
+		</section>
+	</section>
+	<section>
+		<title>Exported Functions</title>
+		<section>
+			<title>
+				<function moreinfo="none">jsonrpc_notification(method, parameters)</function>
+			</title>
+			<para>
+				Invokes the remote 'method' with the given 'parameters' as a notification.
+				Unlike jsonrpc_request (below), notifications do not receive a response. 
+				Script processing continues in the usual fashion as soon as the notification
+				has been sent.
+			</para>
+			<para>
+				The method and parameters can be a static string or dynamic string value with 
+				config variables.
+			</para>
+			<example>
+				<title><function>jsonrpc_notification</function> usage</title>
+				<programlisting format="linespecific">
+...
+jsonrpc_notification("update_user", "{'id': 1234, 'name': 'Petros'}")
+...
+				</programlisting>
+			</example>
+		</section>
+
+		<section>
+			<title>
+				<function moreinfo="none">jsonrpc_request(method, parameters, return_route, error_route, result_var)</function>
+			</title>
+			<para>
+				Invokes the remote 'method' with the given 'parameters'. 
+				When the response is received, continues processing of the SIP request
+				with the route[return_route]. If a timeout occurs, no servers can be reached,
+				or a jsonrpc error message is received, continues process at route[error_route].
+				In this case, the result_var will contain one of "timeout", "failure", or the error
+				message received back from the jsonrpc server.
+			</para>
+			<para>
+				The method, parameters, return_route, and error_route can be a static string or 
+				a dynamic string value with config variables.
+			</para>
+			<para>
+				Since the SIP request handling is resumed in another process,
+				the config file execution is lost. As mentioned above, only 
+				shared variables ($shv, etc) should be used for any value that
+				will be needed when the script is resumed.
+			</para>
+			<para>
+				The result is stored in the pseudo-variable 'result_var'. Since this 
+				variable is set <emphasis>after</emphasis> the response is received,
+				it is possible to use a $var for this parameter.
+			</para>
+			<example>
+				<title><function>jsonrpc_request</function> usage</title>
+				<programlisting format="linespecific">
+...
+jsonrpc_request("get_user", "{'id': 1234}", "RESPONSE", "ERROR", "$var(result)");
+...
+route[RESPONSE] {
+	xlog("Result received: $var(result)");
+	...
+}
+...
+route[ERROR] {
+	xlog("Error received: $var(result)");
+	...
+}
+...
+				</programlisting>
+			</example>
+		</section>
+	</section>
+</chapter>
+

+ 174 - 0
modules/jsonrpc-c/jsonrpc.c

@@ -0,0 +1,174 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2011 Flowroute LLC (flowroute.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+
+#include "../../sr_module.h"
+#include "../../mem/mem.h"
+
+#include "jsonrpc.h"
+
+
+jsonrpc_request_t * request_table[JSONRPC_DEFAULT_HTABLE_SIZE] = {0};
+int next_id = 1;
+
+jsonrpc_request_t* get_request(int id);
+int store_request(jsonrpc_request_t* req);
+
+
+jsonrpc_request_t* build_jsonrpc_request(char *method, json_object *params, char *cbdata, int (*cbfunc)(json_object*, char*, int))
+{
+	if (next_id>JSONRPC_MAX_ID) {
+		next_id = 1;
+	} else {
+		next_id++;
+	}
+
+	jsonrpc_request_t *req = pkg_malloc(sizeof(jsonrpc_request_t));
+	if (!req) {
+		LM_ERR("Out of memory!");
+		return 0;
+	}
+	req->id = next_id;
+	req->cbfunc = cbfunc;
+	req->cbdata = cbdata;
+	req->next = NULL;
+	req->timer_ev = NULL;
+	if (!store_request(req))
+		return 0;
+
+	req->payload = json_object_new_object();
+
+	json_object_object_add(req->payload, "id", json_object_new_int(next_id));
+	json_object_object_add(req->payload, "jsonrpc", json_object_new_string("2.0"));
+	json_object_object_add(req->payload, "method", json_object_new_string(method));
+	json_object_object_add(req->payload, "params", params);
+
+	return req;
+}
+
+json_object* build_jsonrpc_notification(char *method, json_object *params) 
+{
+	json_object *req = json_object_new_object();
+	json_object_object_add(req, "jsonrpc", json_object_new_string("2.0"));
+	json_object_object_add(req, "method", json_object_new_string(method));
+	json_object_object_add(req, "params", params);
+
+	return req; 
+}
+
+
+int handle_jsonrpc_response(json_object *response)
+{
+	jsonrpc_request_t *req;	
+	json_object *_id = json_object_object_get(response, "id");
+	int id = json_object_get_int(_id);
+	
+	if (!(req = get_request(id))) {
+		json_object_put(response);
+		return -1;
+	}
+
+	json_object *result = json_object_object_get(response, "result");
+	
+	if (result) {
+		req->cbfunc(result, req->cbdata, 0);
+	} else {
+		json_object *error = json_object_object_get(response, "error");
+		if (error) {
+			req->cbfunc(error, req->cbdata, 1);
+		} else {
+			LM_ERR("Response received with neither a result nor an error.\n");
+			return -1;
+		}
+	}
+	
+	if (req->timer_ev) {
+		close(req->timerfd);
+		event_del(req->timer_ev);
+		pkg_free(req->timer_ev);
+	} else {
+		LM_ERR("No timer for req id %d\n", id);
+	}
+	pkg_free(req);
+	return 1;
+}
+
+int id_hash(int id) {
+	return (id % JSONRPC_DEFAULT_HTABLE_SIZE);
+}
+
+jsonrpc_request_t* get_request(int id) {
+	int key = id_hash(id);
+	jsonrpc_request_t *req, *prev_req = NULL;
+	req = request_table[key];
+	
+	while (req && req->id != id) {
+		prev_req = req;
+		if (!(req = req->next)) {
+			break;
+		};
+	}
+	
+	if (req && req->id == id) {
+		if (prev_req != NULL) {
+			prev_req-> next = req->next;
+		} else {
+			request_table[key] = NULL;
+		}
+		return req;
+	}
+	return 0;
+}
+
+void void_jsonrpc_request(int id) {
+	get_request(id);
+}
+
+int store_request(jsonrpc_request_t* req) {
+	int key = id_hash(req->id);
+	jsonrpc_request_t* existing;
+
+	if ((existing = request_table[key])) { /* collision */
+		jsonrpc_request_t* i;
+		for(i=existing; i; i=i->next) {
+			if (i == NULL) {
+				i = req;
+				LM_ERR("!!!!!!!");
+				return 1;
+			}
+			if (i->next == NULL) {
+				i->next = req;
+				return 1;
+			}
+		}
+	} else {
+		request_table[key] = req;
+	}
+	return 1;
+}
+

+ 52 - 0
modules/jsonrpc-c/jsonrpc.h

@@ -0,0 +1,52 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2011 Flowroute LLC (flowroute.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#ifndef _JSONRPC_H_
+#define _JSONRPC_H_
+
+#define JSONRPC_DEFAULT_HTABLE_SIZE 500
+#define JSONRPC_MAX_ID 1000000
+
+#define JSONRPC_INTERNAL_SERVER_ERROR -32603
+
+#include <json.h>
+#include <event.h>
+
+typedef struct jsonrpc_request jsonrpc_request_t;
+
+struct jsonrpc_request {
+	int id, timerfd;
+	jsonrpc_request_t *next;
+	int (*cbfunc)(json_object*, char*, int);
+	char *cbdata;
+	json_object *payload;
+	struct event *timer_ev; 
+};
+
+json_object* build_jsonrpc_notification(char *method, json_object *params); 
+jsonrpc_request_t* build_jsonrpc_request(char *method, json_object *params, char *cbdata, int (*cbfunc)(json_object*, char*, int));
+int handle_jsonrpc_response(json_object *response);
+void void_jsonrpc_request(int id);
+#endif /* _JSONRPC_H_ */
+

+ 557 - 0
modules/jsonrpc-c/jsonrpc_io.c

@@ -0,0 +1,557 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2011 Flowroute LLC (flowroute.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+#include <fcntl.h>
+#include <event.h>
+#include <sys/timerfd.h>
+
+#include "../../sr_module.h"
+#include "../../route.h"
+#include "../../route_struct.h"
+#include "../../lvalue.h"
+#include "../tm/tm_load.h"
+
+#include "jsonrpc_io.h"
+#include "jsonrpc.h"
+#include "netstring.h"
+
+#define CHECK_MALLOC_VOID(p)  if(!p) {LM_ERR("Out of memory!"); return;}
+#define CHECK_MALLOC(p)  if(!p) {LM_ERR("Out of memory!"); return -1;}
+
+struct jsonrpc_server {
+	char *host;
+	int  port, socket, status;
+	struct jsonrpc_server *next;
+	struct event *ev;
+	struct itimerspec *timer;
+};
+
+struct jsonrpc_server_group {
+	struct jsonrpc_server *next_server;
+	int    priority;
+	struct jsonrpc_server_group *next_group;
+};
+
+struct tm_binds tmb;
+
+struct jsonrpc_server_group *server_group;
+
+void socket_cb(int fd, short event, void *arg);
+void cmd_pipe_cb(int fd, short event, void *arg);
+int  set_non_blocking(int fd);
+int  parse_servers(char *_servers, struct jsonrpc_server_group **group_ptr);
+int  connect_servers(struct jsonrpc_server_group *group);
+int  connect_server(struct jsonrpc_server *server);
+int  handle_server_failure(struct jsonrpc_server *server);
+
+int jsonrpc_io_child_process(int cmd_pipe, char* _servers)
+{
+	if (parse_servers(_servers, &server_group) != 0)
+	{
+		LM_ERR("servers parameter could not be parsed\n");
+		return -1;
+	}
+
+	event_init();
+	
+	struct event pipe_ev;
+	set_non_blocking(cmd_pipe);
+	event_set(&pipe_ev, cmd_pipe, EV_READ | EV_PERSIST, cmd_pipe_cb, &pipe_ev);
+	event_add(&pipe_ev, NULL);
+
+	if (!connect_servers(server_group))
+	{
+		LM_ERR("failed to connect to any servers\n");
+		return -1;
+	}
+
+	event_dispatch();
+	return 0;
+}
+
+void timeout_cb(int fd, short event, void *arg) 
+{
+	LM_ERR("message timeout\n");
+	jsonrpc_request_t *req = (jsonrpc_request_t*)arg;
+	json_object *error = json_object_new_string("timeout");
+	void_jsonrpc_request(req->id);
+	close(req->timerfd);
+	event_del(req->timer_ev);
+	pkg_free(req->timer_ev);
+	req->cbfunc(error, req->cbdata, 1);
+	pkg_free(req);
+}
+
+int result_cb(json_object *result, char *data, int error) 
+{
+	struct jsonrpc_pipe_cmd *cmd = (struct jsonrpc_pipe_cmd*)data;
+
+	pv_spec_t *dst = cmd->cb_pv;
+	pv_value_t val;
+
+	const char* res = json_object_get_string(result);
+
+	val.rs.s = (char*)res;
+	val.rs.len = strlen(res);
+	val.flags = PV_VAL_STR;
+
+	dst->setf(0, &dst->pvp, (int)EQ_T, &val);
+
+	int n;
+	if (error) {
+		n = route_get(&main_rt, cmd->err_route);
+	} else {
+		n = route_get(&main_rt, cmd->cb_route);
+	}
+
+	struct action *a = main_rt.rlist[n];
+	tmb.t_continue(cmd->t_hash, cmd->t_label, a);	
+
+	free_pipe_cmd(cmd);
+	return 0;
+}
+
+
+int (*res_cb)(json_object*, char*, int) = &result_cb;
+
+
+void cmd_pipe_cb(int fd, short event, void *arg)
+{
+	struct jsonrpc_pipe_cmd *cmd;
+	/* struct event *ev = (struct event*)arg; */
+
+	if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
+		LM_ERR("failed to read from command pipe: %s\n", strerror(errno));
+		return;
+	}
+
+	json_object *params = json_tokener_parse(cmd->params);
+	json_object *payload = NULL;
+	jsonrpc_request_t *req = NULL;
+
+	if (cmd->notify_only) {
+		payload = build_jsonrpc_notification(cmd->method, params);
+	} else {
+		req = build_jsonrpc_request(cmd->method, params, (char*)cmd, res_cb);
+		if (req)
+			payload = req->payload;
+	}
+
+	if (!payload) {
+		LM_ERR("Failed to build jsonrpc_request_t (method: %s, params: %s)\n", cmd->method, cmd->params);	
+		return;
+	}
+	char *json = (char*)json_object_get_string(payload);
+
+	char *ns; size_t bytes;
+	bytes = netstring_encode_new(&ns, json, (size_t)strlen(json));
+
+	struct jsonrpc_server_group *g;
+	int sent = 0;
+	for (g = server_group; g != NULL; g = g->next_group)
+	{
+		struct jsonrpc_server *s, *first = NULL;
+		for (s = g->next_server; s != first; s = s->next)
+		{
+			if (first == NULL) first = s;
+			if (s->status == JSONRPC_SERVER_CONNECTED) {
+				if (send(s->socket, ns, bytes, 0) == bytes)
+				{
+					sent = 1;
+					break;
+				} else {
+					handle_server_failure(s);
+				}
+			}
+			g->next_server = s->next;
+		}
+		if (sent) {
+			break;
+		} else {
+			LM_WARN("Failed to send on priority group %d... proceeding to next priority group.\n", g->priority);
+		}
+	}
+
+	if (sent && req) {
+		int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
+
+		if (timerfd == -1) {
+			LM_ERR("Could not create timerfd.");
+			return;
+		}
+
+		req->timerfd = timerfd;
+		struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec));
+		CHECK_MALLOC_VOID(itime);
+		itime->it_interval.tv_sec = 0;
+		itime->it_interval.tv_nsec = 0;
+
+		itime->it_value.tv_sec = JSONRPC_TIMEOUT/1000;
+		itime->it_value.tv_nsec = (JSONRPC_TIMEOUT % 1000) * 1000000;
+		if (timerfd_settime(timerfd, 0, itime, NULL) == -1) 
+		{
+			LM_ERR("Could not set timer.");
+			return;
+		}
+		pkg_free(itime);
+		struct event *timer_ev = pkg_malloc(sizeof(struct event));
+		CHECK_MALLOC_VOID(timer_ev);
+		event_set(timer_ev, timerfd, EV_READ, timeout_cb, req); 
+		if(event_add(timer_ev, NULL) == -1) {
+			LM_ERR("event_add failed while setting request timer (%s).", strerror(errno));
+			return;
+		}
+		req->timer_ev = timer_ev;
+	} else if (!sent) {
+		LM_ERR("Request could not be sent... no more failover groups.\n");
+		if (req) {
+			json_object *error = json_object_new_string("failure");
+			void_jsonrpc_request(req->id);
+			req->cbfunc(error, req->cbdata, 1);
+		}
+	}
+
+	pkg_free(ns);
+	json_object_put(payload);
+}
+
+void socket_cb(int fd, short event, void *arg)
+{	
+	struct jsonrpc_server *server = (struct jsonrpc_server*)arg;
+
+	if (event != EV_READ) {
+		LM_ERR("unexpected socket event (%d)\n", event);
+		handle_server_failure(server);	
+		return;
+	}
+
+	char *netstring;
+
+	int retval = netstring_read_fd(fd, &netstring);
+
+	if (retval != 0) {
+		LM_ERR("bad netstring (%d)\n", retval);
+		handle_server_failure(server);
+		return;
+	}	
+
+	struct json_object *res = json_tokener_parse(netstring);
+
+	if (res) {
+		handle_jsonrpc_response(res);
+		json_object_put(res);
+	} else {
+		LM_ERR("netstring could not be parsed: (%s)\n", netstring);
+		handle_server_failure(server);
+	}
+	pkg_free(netstring);
+}
+
+int set_non_blocking(int fd)
+{
+	int flags;
+
+	flags = fcntl(fd, F_GETFL);
+	if (flags < 0)
+		return flags;
+	flags |= O_NONBLOCK;
+	if (fcntl(fd, F_SETFL, flags) < 0)
+		return -1;
+
+	return 0;
+}
+
+int parse_servers(char *_servers, struct jsonrpc_server_group **group_ptr)
+{
+	char cpy[strlen(_servers)+1];
+	char *servers = strcpy(cpy, _servers);
+
+	struct jsonrpc_server_group *group = NULL;
+
+	/* parse servers string */
+	char *token = strtok(servers, ":");
+	while (token != NULL) 
+	{
+		char *host, *port_s, *priority_s, *tail;
+		int port, priority;
+		host = token;
+
+		/* validate domain */
+		if (!(isalpha(host[0]) || isdigit(host[0]))) {
+			LM_ERR("invalid domain (1st char is '%c')\n", host[0]);
+			return -1;
+		}
+		int i;
+		for (i=1; i<strlen(host)-1; i++)
+		{
+			if(!(isalpha(host[i]) || isdigit(host[i]) || host[i] == '-' || host[i] == '.'))
+			{
+				LM_ERR("invalid domain (char %d is %c)\n", i, host[i]);
+				return -1;
+			}
+		}
+		if (!(isalpha(host[i]) || isdigit(host[i]))) {
+			LM_ERR("invalid domain (char %d (last) is %c)\n", i, host[i]);
+			return -1;
+		}
+
+		/* convert/validate port */
+		port_s = strtok(NULL, ",");
+		if (port_s == NULL || !(port = strtol(port_s, &tail, 0)) || strlen(tail)) 
+		{
+			LM_ERR("invalid port: %s\n", port_s);
+			return -1;
+		}
+
+		/* convert/validate priority */
+		priority_s = strtok(NULL, " ");
+		if (priority_s == NULL || !(priority = strtol(priority_s, &tail, 0)) || strlen(tail)) 
+		{
+			LM_ERR("invalid priority: %s\n", priority_s);
+			return -1;
+		}
+
+	
+		struct jsonrpc_server *server = pkg_malloc(sizeof(struct jsonrpc_server));
+		CHECK_MALLOC(server);
+		char *h = pkg_malloc(strlen(host)+1);
+		CHECK_MALLOC(h);
+
+		strcpy(h,host);
+		server->host = h;
+		server->port = port;
+		server->status = JSONRPC_SERVER_DISCONNECTED;
+		server->socket = 0;
+
+		int group_cnt = 0;
+
+		/* search for a server group with this server's priority */
+		struct jsonrpc_server_group *selected_group = NULL;
+		for (selected_group=group; selected_group != NULL; selected_group=selected_group->next_group)
+		{
+			if (selected_group->priority == priority) break;
+		}
+		
+		if (selected_group == NULL) {
+			group_cnt++;
+			LM_INFO("Creating group for priority %d\n", priority);
+
+			/* this is the first server for this priority... link it to itself */
+			server->next = server;
+			
+			selected_group = pkg_malloc(sizeof(struct jsonrpc_server_group));
+			CHECK_MALLOC(selected_group);
+			selected_group->priority = priority;
+			selected_group->next_server = server;
+			
+			/* insert the group properly in the linked list */
+			struct jsonrpc_server_group *x, *pg;
+			pg = NULL;
+			if (group == NULL) 
+			{
+				group = selected_group;
+				group->next_group = NULL;
+			} else {
+				for (x = group; x != NULL; x = x->next_group) 
+				{
+					if (priority > x->priority)
+					{
+						if (pg == NULL)
+						{
+							group = selected_group;
+						} else {
+							pg->next_group = selected_group;
+						}
+						selected_group->next_group = x;
+						break;
+					} else if (x->next_group == NULL) {
+						x->next_group = selected_group;
+						break;
+					} else {
+						pg = x;
+					}
+				}
+			}
+		} else {
+			LM_ERR("Using existing group for priority %d\n", priority);
+			server->next = selected_group->next_server->next;
+			selected_group->next_server->next = server;
+		}
+
+		token = strtok(NULL, ":");
+	}
+
+	*group_ptr = group;
+	return 0;
+}
+
+int connect_server(struct jsonrpc_server *server) 
+{	
+	struct sockaddr_in  server_addr;
+	struct hostent      *hp;
+
+	server_addr.sin_family = AF_INET;
+	server_addr.sin_port   = htons(server->port);
+
+	hp = gethostbyname(server->host);
+	if (hp == NULL) {
+		LM_ERR("gethostbyname(%s) failed with h_errno=%d.\n", server->host, h_errno);
+		handle_server_failure(server);
+		return -1;
+	}
+	memcpy(&(server_addr.sin_addr.s_addr), hp->h_addr, hp->h_length);
+
+	int sockfd = socket(AF_INET,SOCK_STREAM,0);
+
+	if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in))) {
+		LM_WARN("Failed to connect to %s on port %d... %s\n", server->host, server->port, strerror(errno));
+		handle_server_failure(server);
+		return -1;
+	}
+
+	if (set_non_blocking(sockfd) != 0)
+	{
+		LM_WARN("Failed to set socket (%s:%d) to non blocking.\n", server->host, server->port);
+		handle_server_failure(server);
+		return -1;
+	}
+
+	server->socket = sockfd;
+	server->status = JSONRPC_SERVER_CONNECTED;
+
+	struct event *socket_ev = pkg_malloc(sizeof(struct event));
+	CHECK_MALLOC(socket_ev);
+	event_set(socket_ev, sockfd, EV_READ | EV_PERSIST, socket_cb, server);
+	event_add(socket_ev, NULL);
+	server->ev = socket_ev;
+	return 0;
+}
+
+int  connect_servers(struct jsonrpc_server_group *group)
+{
+	int connected_servers = 0;
+	for (;group != NULL; group = group->next_group)
+	{
+		struct jsonrpc_server *s, *first = NULL;
+		LM_INFO("Connecting to servers for priority %d:\n", group->priority);
+		for (s=group->next_server;s!=first;s=s->next)
+		{
+			if (connect_server(s) == 0) 
+			{
+				connected_servers++;
+				LM_INFO("Connected to host %s on port %d\n", s->host, s->port);
+			}
+			if (first == NULL) first = s;
+		}
+	}
+	return connected_servers;
+}
+
+void reconnect_cb(int fd, short event, void *arg)
+{
+	LM_INFO("Attempting to reconnect now.");
+	struct jsonrpc_server *server = (struct jsonrpc_server*)arg;
+	
+	if (server->status == JSONRPC_SERVER_CONNECTED) {
+		LM_WARN("Trying to connect an already connected server.");
+		return;
+	}
+
+	if (server->ev != NULL) {
+		event_del(server->ev);
+		pkg_free(server->ev);
+		server->ev = NULL;
+	}
+
+	close(fd);
+	pkg_free(server->timer);
+
+	connect_server(server);
+}
+
+int handle_server_failure(struct jsonrpc_server *server)
+{
+	LM_INFO("Setting timer to reconnect to %s on port %d in %d seconds.\n", server->host, server->port, JSONRPC_RECONNECT_INTERVAL);
+
+	if (server->socket)
+		close(server->socket);
+	server->socket = 0;
+	if (server->ev != NULL) {
+		event_del(server->ev);
+		pkg_free(server->ev);
+		server->ev = NULL;
+	}
+	server->status = JSONRPC_SERVER_FAILURE;
+	int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
+	
+	if (timerfd == -1) {
+		LM_ERR("Could not create timerfd to reschedule connection. No further attempts will be made to reconnect this server.");
+		return -1;
+	}
+
+	struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec));
+	CHECK_MALLOC(itime);
+	itime->it_interval.tv_sec = 0;
+	itime->it_interval.tv_nsec = 0;
+	
+	itime->it_value.tv_sec = JSONRPC_RECONNECT_INTERVAL;
+	itime->it_value.tv_nsec = 0;
+	
+	if (timerfd_settime(timerfd, 0, itime, NULL) == -1) 
+	{
+		LM_ERR("Could not set timer to reschedule connection. No further attempts will be made to reconnect this server.");
+		return -1;
+	}
+	LM_INFO("timerfd value is %d\n", timerfd);
+	struct event *timer_ev = pkg_malloc(sizeof(struct event));
+	CHECK_MALLOC(timer_ev);
+	event_set(timer_ev, timerfd, EV_READ, reconnect_cb, server); 
+	if(event_add(timer_ev, NULL) == -1) {
+		LM_ERR("event_add failed while rescheduling connection (%s). No further attempts will be made to reconnect this server.", strerror(errno));
+		return -1;
+	}
+	server->ev = timer_ev;
+	server->timer = itime;
+	return 0;
+}
+
+
+void free_pipe_cmd(struct jsonrpc_pipe_cmd *cmd) 
+{
+	if (cmd->method) 
+		shm_free(cmd->method);
+	if (cmd->params)
+		shm_free(cmd->params);
+	if (cmd->cb_route)
+		shm_free(cmd->cb_route);
+	if (cmd->err_route)
+		shm_free(cmd->err_route);
+	if (cmd->cb_pv)
+		shm_free(cmd->cb_pv);
+	shm_free(cmd);
+}

+ 52 - 0
modules/jsonrpc-c/jsonrpc_io.h

@@ -0,0 +1,52 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2011 Flowroute LLC (flowroute.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#ifndef _JSONRPC_IO_H_
+#define _JSONRPC_IO_H_
+
+#include "../../route_struct.h"
+#include "../../pvar.h"
+
+#define JSONRPC_SERVER_CONNECTED    1
+#define JSONRPC_SERVER_DISCONNECTED 2
+#define JSONRPC_SERVER_FAILURE      3
+
+/* interval (in seconds) at which failed servers are retried */
+#define JSONRPC_RECONNECT_INTERVAL  3
+
+/* time (in ms) after which the error route is called */
+#define JSONRPC_TIMEOUT 			500
+
+struct jsonrpc_pipe_cmd 
+{
+	char *method, *params, *cb_route, *err_route;
+	unsigned int t_hash, t_label, notify_only;
+	pv_spec_t *cb_pv;
+	struct sip_msg *msg;
+};
+
+int jsonrpc_io_child_process(int data_pipe, char* servers);
+void free_pipe_cmd(struct jsonrpc_pipe_cmd *cmd); 
+
+#endif /* _JSONRPC_IO_H_ */

+ 172 - 0
modules/jsonrpc-c/jsonrpc_mod.c

@@ -0,0 +1,172 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2011 Flowroute LLC (flowroute.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#include <arpa/inet.h>
+#include <sys/types.h>
+#include <errno.h>
+
+#include "../../mod_fix.h"
+#include "../../trim.h"
+#include "../../sr_module.h"
+#include "../tm/tm_load.h"
+
+#include "jsonrpc_request.h"
+#include "jsonrpc_io.h"
+#include "jsonrpc.h"
+
+
+MODULE_VERSION
+
+
+static int mod_init(void);
+static int child_init(int);
+static int fixup_request(void** param, int param_no);
+static int fixup_notification(void** param, int param_no);
+static int fixup_request_free(void** param, int param_no);
+int        fixup_pvar_shm(void** param, int param_no);
+
+char *servers_param;
+int  pipe_fds[2] = {-1,-1};
+
+struct tm_binds tmb;
+
+/*
+ * Exported Functions
+ */
+static cmd_export_t cmds[]={
+	{"jsonrpc_request", (cmd_function)jsonrpc_request, 5, fixup_request, fixup_request_free, ANY_ROUTE},
+	{"jsonrpc_notification", (cmd_function)jsonrpc_notification, 2, fixup_notification, 0, ANY_ROUTE},
+	{0, 0, 0, 0, 0, 0}
+};
+ 
+
+/*
+ * Script Parameters
+ */
+static param_export_t mod_params[]={
+	{"servers", STR_PARAM, &servers_param},
+	{ 0,0,0 }
+};
+
+ 
+/*
+ * Exports
+ */
+struct module_exports exports = {
+		"jsonrpc",           /* module name */
+		DEFAULT_DLFLAGS,     /* dlopen flags */
+		cmds,                /* Exported functions */
+		mod_params,          /* Exported parameters */
+		0,                   /* exported statistics */
+		0,                   /* exported MI functions */
+		0,                   /* exported pseudo-variables */
+		0,                   /* extra processes */
+		mod_init,            /* module initialization function */
+		0,                   /* response function*/
+		0,                   /* destroy function */
+		child_init           /* per-child init function */
+};
+
+
+static int mod_init(void) {
+	load_tm_f  load_tm;
+
+	/* load the tm functions  */
+	if ( !(load_tm=(load_tm_f)find_export("load_tm", NO_SCRIPT, 0)))
+	{
+		LOG(L_ERR, "ERROR:jsonrpc:mod_init: cannot import load_tm\n");
+		return -1;
+	}
+	/* let the auto-loading function load all TM stuff */
+	if (load_tm( &tmb )==-1)
+		return -1;
+
+	if (servers_param == NULL) {
+		LM_ERR("servers parameter missing.\n");
+		return -1;
+	}
+
+	register_procs(1);
+
+	if (pipe(pipe_fds) < 0) {
+		LM_ERR("pipe() failed\n");
+		return -1;
+	}
+	
+	return(0);
+}
+
+static int child_init(int rank) 
+{
+	int pid;
+	
+	if (rank>PROC_MAIN)
+		cmd_pipe = pipe_fds[1];
+
+	if (rank!=PROC_MAIN)
+		return 0;
+
+	pid=fork_process(PROC_NOCHLDINIT, "jsonrpc io handler", 1);
+	if (pid<0)
+		return -1; /* error */
+	if(pid==0){
+		/* child */
+		close(pipe_fds[1]);
+		return jsonrpc_io_child_process(pipe_fds[0], servers_param);
+	}
+	return 0;
+}
+
+/* Fixup Functions */
+
+static int fixup_request(void** param, int param_no)
+{
+  if (param_no <= 4) {
+		return fixup_spve_null(param, 1);
+	} else if (param_no == 5) {
+		return fixup_pvar_null(param, 1);
+	}
+	LM_ERR("jsonrpc_request takes exactly 5 parameters.\n");
+	return -1;
+}
+
+static int fixup_notification(void** param, int param_no)
+{
+  if (param_no <= 2) {
+		return fixup_spve_null(param, 1);
+	}
+	LM_ERR("jsonrpc_notification takes exactly 2 parameters.\n");
+	return -1;
+}
+
+static int fixup_request_free(void** param, int param_no)
+{
+  if (param_no <= 4) {
+		return 0;
+	} else if (param_no == 5) {
+		return fixup_free_pvar_null(param, 1);
+	}
+	LM_ERR("jsonrpc_request takes exactly 5 parameters.\n");
+	return -1;
+}

+ 165 - 0
modules/jsonrpc-c/jsonrpc_request.c

@@ -0,0 +1,165 @@
+/**
+ * $Id$
+*
+ * Copyright (C) 2011 Flowroute LLC (flowroute.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#include "../../mod_fix.h"
+#include "../../pvar.h"
+#include "../../lvalue.h"
+#include "../tm/tm_load.h"
+
+#include "jsonrpc_request.h"
+#include "jsonrpc_io.h"
+
+
+struct tm_binds tmb;
+static char *shm_strdup(str *src);
+
+int memory_error() {
+	LM_ERR("Out of memory!");
+	return -1;
+}
+
+int jsonrpc_request(struct sip_msg* _m, char* _method, char* _params, char* _cb_route, char* _err_route, char* _cb_pv)
+{
+  str method;
+  str params;
+  str cb_route;
+  str err_route;
+	
+
+	if (fixup_get_svalue(_m, (gparam_p)_method, &method) != 0) {
+		LM_ERR("cannot get method value\n");
+		return -1;
+	}
+	if (fixup_get_svalue(_m, (gparam_p)_params, &params) != 0) {
+		LM_ERR("cannot get params value\n");
+		return -1;
+	}
+	if (fixup_get_svalue(_m, (gparam_p)_cb_route, &cb_route) != 0) {
+		LM_ERR("cannot get cb_route value\n");
+		return -1;
+	}
+
+	if (fixup_get_svalue(_m, (gparam_p)_err_route, &err_route) != 0) {
+		LM_ERR("cannot get err_route value\n");
+		return -1;
+	}
+
+	tm_cell_t *t = 0;
+	t = tmb.t_gett();
+	if (t==NULL || t==T_UNDEFINED)
+	{
+		if(tmb.t_newtran(_m)<0)
+		{
+			LM_ERR("cannot create the transaction\n");
+			return -1;
+		}
+		t = tmb.t_gett();
+		if (t==NULL || t==T_UNDEFINED)
+		{
+			LM_ERR("cannot look up the transaction\n");
+			return -1;
+		}
+	}
+
+	unsigned int hash_index;
+	unsigned int label;
+
+	if (tmb.t_suspend(_m, &hash_index, &label) < 0) {
+		LM_ERR("t_suspend() failed\n");
+		return -1;
+	}
+
+	struct jsonrpc_pipe_cmd *cmd;
+	if (!(cmd = (struct jsonrpc_pipe_cmd *) shm_malloc(sizeof(struct jsonrpc_pipe_cmd))))
+		return memory_error();
+
+	memset(cmd, 0, sizeof(struct jsonrpc_pipe_cmd));
+
+	pv_spec_t *cb_pv = (pv_spec_t*)shm_malloc(sizeof(pv_spec_t));
+	if (!cb_pv)
+		return memory_error();
+
+	cb_pv = memcpy(cb_pv, (pv_spec_t *)_cb_pv, sizeof(pv_spec_t));
+
+	cmd->method = shm_strdup(&method);
+	cmd->params = shm_strdup(&params);
+	cmd->cb_route = shm_strdup(&cb_route);
+	cmd->err_route = shm_strdup(&err_route);
+	cmd->cb_pv = cb_pv;
+	cmd->msg = _m;
+	cmd->t_hash = hash_index;
+	cmd->t_label = label;
+	
+	if (write(cmd_pipe, &cmd, sizeof(cmd)) != sizeof(cmd)) {
+		LM_ERR("failed to write to io pipe: %s\n", strerror(errno));
+		return -1;
+	}
+
+	return 0;
+}
+
+int jsonrpc_notification(struct sip_msg* _m, char* _method, char* _params)
+{
+	str method;
+	str params;
+
+	if (fixup_get_svalue(_m, (gparam_p)_method, &method) != 0) {
+		LM_ERR("cannot get method value\n");
+		return -1;
+	}
+	if (fixup_get_svalue(_m, (gparam_p)_params, &params) != 0) {
+		LM_ERR("cannot get params value\n");
+		return -1;
+	}
+
+	struct jsonrpc_pipe_cmd *cmd;
+	if (!(cmd = (struct jsonrpc_pipe_cmd *) shm_malloc(sizeof(struct jsonrpc_pipe_cmd))))
+		return memory_error();
+
+	memset(cmd, 0, sizeof(struct jsonrpc_pipe_cmd));
+
+	cmd->method = shm_strdup(&method);
+	cmd->params = shm_strdup(&params);
+	cmd->notify_only = 1;
+
+	if (write(cmd_pipe, &cmd, sizeof(cmd)) != sizeof(cmd)) {
+		LM_ERR("failed to write to io pipe: %s\n", strerror(errno));
+		return -1;
+	}
+
+	return 1;
+}
+
+static char *shm_strdup(str *src)
+{
+	char *res;
+
+	if (!src || !src->s)
+		return NULL;
+	if (!(res = (char *) shm_malloc(src->len + 1)))
+		return NULL;
+	strncpy(res, src->s, src->len);
+	res[src->len] = 0;
+	return res;
+}

+ 33 - 0
modules/jsonrpc-c/jsonrpc_request.h

@@ -0,0 +1,33 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2011 Flowroute LLC (flowroute.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#ifndef _JSONRPC_REQUEST_H_
+#define _JSONRPC_REQUEST_H_
+#include "../../parser/msg_parser.h"
+
+int jsonrpc_request(struct sip_msg* msg, char* method, char* params, char* cb_route, char* err_route, char* cb_pv);
+int jsonrpc_notification(struct sip_msg* msg, char* method, char* params);
+int cmd_pipe;
+
+#endif /* _JSONRPC_REQUEST_H_ */

+ 198 - 0
modules/jsonrpc-c/netstring.c

@@ -0,0 +1,198 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2011 Flowroute LLC (flowroute.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <ctype.h>
+#include <math.h>
+#include "netstring.h"
+
+#include "../../sr_module.h"
+#include "../../mem/mem.h"
+
+
+int netstring_read_fd(int fd, char **netstring) {
+  int i, bytes;
+	size_t len = 0;
+
+ 	*netstring = NULL;
+
+	char buffer[10]={0};
+	
+	/* Peek at first 10 bytes, to get length and colon */
+	bytes = recv(fd,buffer,10,MSG_PEEK);
+
+	if (bytes<3) return NETSTRING_ERROR_TOO_SHORT;
+	
+  /* No leading zeros allowed! */
+  if (buffer[0] == '0' && isdigit(buffer[1]))
+    return NETSTRING_ERROR_LEADING_ZERO;
+
+  /* The netstring must start with a number */
+  if (!isdigit(buffer[0])) return NETSTRING_ERROR_NO_LENGTH;
+
+  /* Read the number of bytes */
+  for (i = 0; i < bytes && isdigit(buffer[i]); i++) {
+    /* Error if more than 9 digits */
+    if (i >= 9) return NETSTRING_ERROR_TOO_LONG;
+    /* Accumulate each digit, assuming ASCII. */
+    len = len*10 + (buffer[i] - '0');
+  }
+
+  /* Read the colon */
+  if (buffer[i++] != ':') return NETSTRING_ERROR_NO_COLON;
+	
+	/* Read the whole string from the buffer */
+	size_t read_len = i+len+1;
+	char *buffer2 = pkg_malloc(read_len);
+	if (!buffer2) {
+		LM_ERR("Out of memory!");
+		return -1;
+	}
+	bytes = recv(fd,buffer2,read_len,0);
+
+  /* Make sure we got the whole netstring */
+  if (read_len > bytes) return NETSTRING_ERROR_TOO_SHORT;
+	
+  /* Test for the trailing comma */
+  if (buffer2[read_len-1] != ',') return NETSTRING_ERROR_NO_COMMA;
+
+	buffer2[read_len-1] = '\0';
+	
+	int x;
+		
+	for(x=0;x<=read_len-i-1;x++) {
+		buffer2[x]=buffer2[x+i];
+	}
+	
+	*netstring = buffer2;
+  return 0;
+}
+
+
+/* Reads a netstring from a `buffer` of length `buffer_length`. Writes
+   to `netstring_start` a pointer to the beginning of the string in
+   the buffer, and to `netstring_length` the length of the
+   string. Does not allocate any memory. If it reads successfully,
+   then it returns 0. If there is an error, then the return value will
+   be negative. The error values are:
+
+   NETSTRING_ERROR_TOO_LONG      More than 999999999 bytes in a field
+   NETSTRING_ERROR_NO_COLON      No colon was found after the number
+   NETSTRING_ERROR_TOO_SHORT     Number of bytes greater than buffer length
+   NETSTRING_ERROR_NO_COMMA      No comma was found at the end
+   NETSTRING_ERROR_LEADING_ZERO  Leading zeros are not allowed
+   NETSTRING_ERROR_NO_LENGTH     Length not given at start of netstring
+
+   If you're sending messages with more than 999999999 bytes -- about
+   2 GB -- then you probably should not be doing so in the form of a
+   single netstring. This restriction is in place partially to protect
+   from malicious or erroneous input, and partly to be compatible with
+   D. J. Bernstein's reference implementation.
+
+   Example:
+      if (netstring_read("3:foo,", 6, &str, &len) < 0) explode_and_die();
+ */
+
+int netstring_read(char *buffer, size_t buffer_length,
+		   char **netstring_start, size_t *netstring_length) {
+  int i;
+  size_t len = 0;
+
+  /* Write default values for outputs */
+  *netstring_start = NULL; *netstring_length = 0;
+
+  /* Make sure buffer is big enough. Minimum size is 3. */
+  if (buffer_length < 3) return NETSTRING_ERROR_TOO_SHORT;
+
+  /* No leading zeros allowed! */
+  if (buffer[0] == '0' && isdigit(buffer[1]))
+    return NETSTRING_ERROR_LEADING_ZERO;
+
+  /* The netstring must start with a number */
+  if (!isdigit(buffer[0])) return NETSTRING_ERROR_NO_LENGTH;
+
+  /* Read the number of bytes */
+  for (i = 0; i < buffer_length && isdigit(buffer[i]); i++) {
+    /* Error if more than 9 digits */
+    if (i >= 9) return NETSTRING_ERROR_TOO_LONG;
+    /* Accumulate each digit, assuming ASCII. */
+    len = len*10 + (buffer[i] - '0');
+  }
+
+  /* Check buffer length once and for all. Specifically, we make sure
+     that the buffer is longer than the number we've read, the length
+     of the string itself, and the colon and comma. */
+  if (i + len + 1 >= buffer_length) return NETSTRING_ERROR_TOO_SHORT;
+
+  /* Read the colon */
+  if (buffer[i++] != ':') return NETSTRING_ERROR_NO_COLON;
+  
+  /* Test for the trailing comma, and set the return values */
+  if (buffer[i + len] != ',') return NETSTRING_ERROR_NO_COMMA;
+  *netstring_start = &buffer[i]; *netstring_length = len;
+
+  return 0;
+}
+
+/* Return the length, in ASCII characters, of a netstring containing
+   `data_length` bytes. */
+size_t netstring_buffer_size(size_t data_length) {
+  if (data_length == 0) return 3;
+  return (size_t)ceil(log10((double)data_length + 1)) + data_length + 2;
+}
+
+/* Allocate and create a netstring containing the first `len` bytes of
+   `data`. This must be manually freed by the client. If `len` is 0
+   then no data will be read from `data`, and it may be NULL. */
+size_t netstring_encode_new(char **netstring, char *data, size_t len) {
+  char *ns;
+  size_t num_len = 1;
+
+  if (len == 0) {
+    ns = pkg_malloc(3);
+	if (!ns) {
+		LM_ERR("Out of memory!");
+		return 0;
+	}
+    ns[0] = '0';
+    ns[1] = ':';
+    ns[2] = ',';
+  } else {
+    num_len = (size_t)ceil(log10((double)len + 1));
+    ns = pkg_malloc(num_len + len + 2);
+	if (!ns) {
+		LM_ERR("Out of memory!");
+		return 0;
+	}
+    sprintf(ns, "%lu:", (unsigned long)len);
+    memcpy(ns + num_len + 1, data, len);
+    ns[num_len + len + 1] = ',';
+  }
+
+  *netstring = ns;
+  return num_len + len + 2;
+}
+  

+ 48 - 0
modules/jsonrpc-c/netstring.h

@@ -0,0 +1,48 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2011 Flowroute LLC (flowroute.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#ifndef __NETSTRING_STREAM_H
+#define __NETSTRNG_STREAM_H
+
+#include <string.h>
+
+int netstring_read_fd(int fd, char **netstring);
+
+int netstring_read(char *buffer, size_t buffer_length,
+		   char **netstring_start, size_t *netstring_length);
+
+size_t netstring_buffer_size(size_t data_length);
+
+size_t netstring_encode_new(char **netstring, char *data, size_t len);
+
+/* Errors that can occur during netstring parsing */
+#define NETSTRING_ERROR_TOO_LONG     -1
+#define NETSTRING_ERROR_NO_COLON     -2
+#define NETSTRING_ERROR_TOO_SHORT    -3
+#define NETSTRING_ERROR_NO_COMMA     -4
+#define NETSTRING_ERROR_LEADING_ZERO -5
+#define NETSTRING_ERROR_NO_LENGTH    -6
+#define NETSTRING_ERROR_BAD_FD       -7
+
+#endif