Переглянути джерело

new modules: ndb_redis - connector to redis server

- redis - nosql database engine
- uses hiredis library
- exports function to send commands to redis and a new pseudo-variable
  class to access the reply: $redis(key)
Daniel-Constantin Mierla 14 роки тому
батько
коміт
286a14e888

+ 16 - 0
modules/ndb_redis/Makefile

@@ -0,0 +1,16 @@
+#
+# WARNING: do not run this directly, it should be run by the master Makefile
+
+include ../../Makefile.defs
+auto_gen=
+NAME=ndb_redis.so
+
+DEFS += -I/usr/local/include
+LIBS = -lhiredis
+
+DEFS+=-DOPENSER_MOD_INTERFACE
+
+SERLIBPATH=../../lib
+SER_LIBS+=$(SERLIBPATH)/kcore/kcore
+
+include ../../Makefile.modules

+ 405 - 0
modules/ndb_redis/ndb_redis_mod.c

@@ -0,0 +1,405 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2011 Daniel-Constantin Mierla (asipto.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+
+#include "../../sr_module.h"
+#include "../../mem/mem.h"
+#include "../../dprint.h"
+#include "../../mod_fix.h"
+#include "../../trim.h"
+
+#include "redis_client.h"
+
+MODULE_VERSION
+
+/** parameters */
+
+int redis_srv_param(modparam_t type, void *val);
+static int w_redis_cmd3(struct sip_msg* msg, char* ssrv, char* scmd,
+		char* sres);
+static int w_redis_cmd4(struct sip_msg* msg, char* ssrv, char* scmd,
+		char *sargv1, char* sres);
+static int w_redis_cmd5(struct sip_msg* msg, char* ssrv, char* scmd,
+		char *sargv1, char *sargv2, char* sres);
+static int w_redis_cmd6(struct sip_msg* msg, char* ssrv, char* scmd,
+		char *sargv1, char *sargv2, char *sargv3, char* sres);
+static int fixup_redis_cmd6(void** param, int param_no);
+
+static int  mod_init(void);
+static void mod_destroy(void);
+static int  child_init(int rank);
+
+static int pv_get_redisc(struct sip_msg *msg,  pv_param_t *param,
+		pv_value_t *res);
+static int pv_parse_redisc_name(pv_spec_p sp, str *in);
+
+static pv_export_t mod_pvs[] = {
+	{ {"redis", sizeof("redis")-1}, PVT_OTHER, pv_get_redisc, 0,
+		pv_parse_redisc_name, 0, 0, 0 },
+	{ {0, 0}, 0, 0, 0, 0, 0, 0, 0 }
+};
+
+
+static cmd_export_t cmds[]={
+	{"redis_cmd", (cmd_function)w_redis_cmd3, 3, fixup_redis_cmd6,
+		0, ANY_ROUTE},
+	{"redis_cmd", (cmd_function)w_redis_cmd4, 4, fixup_redis_cmd6,
+		0, ANY_ROUTE},
+	{"redis_cmd", (cmd_function)w_redis_cmd5, 5, fixup_redis_cmd6,
+		0, ANY_ROUTE},
+	{"redis_cmd", (cmd_function)w_redis_cmd6, 6, fixup_redis_cmd6,
+		0, ANY_ROUTE},
+	{0, 0, 0, 0, 0, 0}
+};
+
+static param_export_t params[]={
+	{"server",         STR_PARAM|USE_FUNC_PARAM, (void*)redis_srv_param},
+	{0, 0, 0}
+};
+
+struct module_exports exports = {
+	"ndb_redis",
+	DEFAULT_DLFLAGS, /* dlopen flags */
+	cmds,
+	params,
+	0,
+	0,              /* exported MI functions */
+	mod_pvs,        /* exported pseudo-variables */
+	0,              /* extra processes */
+	mod_init,       /* module initialization function */
+	0,              /* response function */
+	mod_destroy,    /* destroy function */
+	child_init      /* per child init function */
+};
+
+
+
+/**
+ * init module function
+ */
+static int mod_init(void)
+{
+	/* success code */
+	return 0;
+}
+
+/* each child get a new connection to the database */
+static int child_init(int rank)
+{
+	/* skip child init for non-worker process ranks */
+	if (rank==PROC_INIT || rank==PROC_MAIN || rank==PROC_TCP_MAIN)
+		return 0;
+
+	if(redisc_init()<0)
+	{
+		LM_ERR("failed to initialize redis connections\n");
+		return -1;
+	}
+	return 0;
+}
+
+/**
+ *
+ */
+static void mod_destroy(void)
+{
+	LM_DBG("cleaning up\n");
+	redisc_destroy();
+}
+
+/**
+ *
+ */
+static int w_redis_cmd3(struct sip_msg* msg, char* ssrv, char* scmd, char* sres)
+{
+	str s[3];
+
+	if(fixup_get_svalue(msg, (gparam_t*)ssrv, &s[0])!=0)
+	{
+		LM_ERR("no redis server name\n");
+		return -1;
+	}
+	if(fixup_get_svalue(msg, (gparam_t*)scmd, &s[1])!=0)
+	{
+		LM_ERR("no redis command\n");
+		return -1;
+	}
+	if(fixup_get_svalue(msg, (gparam_t*)sres, &s[2])!=0)
+	{
+		LM_ERR("no redis reply name\n");
+		return -1;
+	}
+
+	if(redisc_exec(&s[0], &s[1], NULL, NULL, NULL, &s[2])<0)
+		return -1;
+	return 1;
+}
+
+/**
+ *
+ */
+static int w_redis_cmd4(struct sip_msg* msg, char* ssrv, char* scmd,
+		char *sargv1, char* sres)
+{
+	str s[4];
+
+	if(fixup_get_svalue(msg, (gparam_t*)ssrv, &s[0])!=0)
+	{
+		LM_ERR("no redis server name\n");
+		return -1;
+	}
+	if(fixup_get_svalue(msg, (gparam_t*)scmd, &s[1])!=0)
+	{
+		LM_ERR("no redis command\n");
+		return -1;
+	}
+	if(fixup_get_svalue(msg, (gparam_t*)sargv1, &s[2])!=0)
+	{
+		LM_ERR("no argument 1\n");
+		return -1;
+	}
+	if(fixup_get_svalue(msg, (gparam_t*)sres, &s[3])!=0)
+	{
+		LM_ERR("no redis reply name\n");
+		return -1;
+	}
+
+	if(redisc_exec(&s[0], &s[1], &s[2], NULL, NULL, &s[3])<0)
+		return -1;
+	return 1;
+}
+
+/**
+ *
+ */
+static int w_redis_cmd5(struct sip_msg* msg, char* ssrv, char* scmd,
+		char *sargv1, char *sargv2, char* sres)
+{
+	str s[5];
+
+	if(fixup_get_svalue(msg, (gparam_t*)ssrv, &s[0])!=0)
+	{
+		LM_ERR("no redis server name\n");
+		return -1;
+	}
+	if(fixup_get_svalue(msg, (gparam_t*)scmd, &s[1])!=0)
+	{
+		LM_ERR("no redis command\n");
+		return -1;
+	}
+	if(fixup_get_svalue(msg, (gparam_t*)sargv1, &s[2])!=0)
+	{
+		LM_ERR("no argument 1\n");
+		return -1;
+	}
+	if(fixup_get_svalue(msg, (gparam_t*)sargv2, &s[3])!=0)
+	{
+		LM_ERR("no argument 2\n");
+		return -1;
+	}
+	if(fixup_get_svalue(msg, (gparam_t*)sres, &s[4])!=0)
+	{
+		LM_ERR("no redis reply name\n");
+		return -1;
+	}
+
+	if(redisc_exec(&s[0], &s[1], &s[2], &s[3], NULL, &s[4])<0)
+		return -1;
+	return 1;
+}
+
+/**
+ *
+ */
+static int w_redis_cmd6(struct sip_msg* msg, char* ssrv, char* scmd,
+		char *sargv1, char *sargv2, char *sargv3, char* sres)
+{
+	str s[6];
+
+	if(fixup_get_svalue(msg, (gparam_t*)ssrv, &s[0])!=0)
+	{
+		LM_ERR("no redis server name\n");
+		return -1;
+	}
+	if(fixup_get_svalue(msg, (gparam_t*)scmd, &s[1])!=0)
+	{
+		LM_ERR("no redis command\n");
+		return -1;
+	}
+	if(fixup_get_svalue(msg, (gparam_t*)sargv1, &s[2])!=0)
+	{
+		LM_ERR("no argument 1\n");
+		return -1;
+	}
+	if(fixup_get_svalue(msg, (gparam_t*)sargv2, &s[3])!=0)
+	{
+		LM_ERR("no argument 2\n");
+		return -1;
+	}
+	if(fixup_get_svalue(msg, (gparam_t*)sargv3, &s[4])!=0)
+	{
+		LM_ERR("no argument 3\n");
+		return -1;
+	}
+	if(fixup_get_svalue(msg, (gparam_t*)sres, &s[5])!=0)
+	{
+		LM_ERR("no redis reply name\n");
+		return -1;
+	}
+
+	if(redisc_exec(&s[0], &s[1], &s[2], &s[3], &s[4], &s[5])<0)
+		return -1;
+	return 1;
+}
+
+/**
+ *
+ */
+static int fixup_redis_cmd6(void** param, int param_no)
+{
+	return fixup_spve_null(param, 1);
+}
+
+
+/**
+ *
+ */
+int redis_srv_param(modparam_t type, void *val)
+{
+	return redisc_add_server((char*)val);
+}
+
+
+/**
+ *
+ */
+static int pv_parse_redisc_name(pv_spec_p sp, str *in)
+{
+	redisc_pv_t *rpv=NULL;
+	str pvs;
+	int i;
+
+	if(in->s==NULL || in->len<=0)
+		return -1;
+
+	rpv = (redisc_pv_t*)pkg_malloc(sizeof(redisc_pv_t));
+	if(rpv==NULL)
+		return -1;
+
+	memset(rpv, 0, sizeof(redisc_pv_t));
+
+	pvs = *in;
+	trim(&pvs);
+
+	rpv->rname.s = pvs.s;
+	for(i=0; i<pvs.len-2; i++)
+	{
+		if(pvs.s[i]=='=')
+		{
+			if(pvs.s[i+1]!='>')
+			{
+				LM_ERR("invalid var spec [%.*s]\n",
+						in->len, in->s);
+				pkg_free(rpv);
+				return -1;
+			}
+			rpv->rname.len = i;
+			break;
+		}
+	}
+
+	if(rpv->rname.len==0)
+	{
+		LM_ERR("invalid var spec [%.*s]\n", in->len, in->s);
+		pkg_free(rpv);
+		return -1;
+	}
+	i += 2;
+	rpv->rkey.s   = pvs.s + i;
+	rpv->rkey.len = pvs.len - i;
+
+	if(rpv->rkey.len==5 && strncmp(rpv->rkey.s, "value", 5)==0) {
+		rpv->rkeyid = 1;
+	} else if(rpv->rkey.len==4 && strncmp(rpv->rkey.s, "type", 4)==0) {
+		rpv->rkeyid = 0;
+	} else if(rpv->rkey.len==4 && strncmp(rpv->rkey.s, "info", 4)==0) {
+		rpv->rkeyid = 2;
+	} else {
+		LM_ERR("invalid key spec in [%.*s]\n", in->len, in->s);
+		pkg_free(rpv);
+		return -1;
+	}
+
+	sp->pvp.pvn.u.dname = (void*)rpv;
+	sp->pvp.pvn.type = PV_NAME_OTHER;
+	return 0;
+
+}
+
+/**
+ *
+ */
+static int pv_get_redisc(struct sip_msg *msg,  pv_param_t *param,
+		pv_value_t *res)
+{
+	redisc_pv_t *rpv;
+	str s;
+
+	rpv = (redisc_pv_t*)param->pvn.u.dname;
+	if(rpv->reply==NULL)
+	{
+		rpv->reply = redisc_get_reply(&rpv->rname);
+		if(rpv->reply==NULL)
+			return pv_get_null(msg, param, res);
+	}
+
+	if(rpv->reply->rplRedis==NULL)
+		return pv_get_null(msg, param, res);
+
+	switch(rpv->rkeyid) {
+		case 1:
+			switch(rpv->reply->rplRedis->type) {
+				case REDIS_REPLY_STRING:
+					s.len = rpv->reply->rplRedis->len;
+					s.s = rpv->reply->rplRedis->str;
+					return pv_get_strval(msg, param, res, &s);
+				case REDIS_REPLY_INTEGER:
+					return pv_get_sintval(msg, param, res,
+							(int)rpv->reply->rplRedis->integer);
+				default:
+					return pv_get_null(msg, param, res);
+			}
+		case 2:
+			if(rpv->reply->rplRedis->str==NULL)
+				return pv_get_null(msg, param, res);
+			s.len = rpv->reply->rplRedis->len;
+			s.s = rpv->reply->rplRedis->str;
+			return pv_get_strval(msg, param, res, &s);
+		default:
+			return pv_get_sintval(msg, param, res,
+					rpv->reply->rplRedis->type);
+	}
+}

+ 256 - 0
modules/ndb_redis/redis_client.c

@@ -0,0 +1,256 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2011 Daniel-Constantin Mierla (asipto.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+
+#include "../../mem/mem.h"
+#include "../../dprint.h"
+#include "../../hashes.h"
+#include "../../ut.h"
+
+#include "redis_client.h"
+
+static redisc_server_t *_redisc_srv_list=NULL;
+
+static redisc_reply_t *_redisc_rpl_list=NULL;
+
+/**
+ *
+ */
+int redisc_init(void)
+{
+	char *addr;
+	unsigned int port;
+	redisc_server_t *rsrv=NULL;
+	param_t *pit = NULL;
+
+	if(_redisc_srv_list==NULL)
+	{
+		LM_ERR("no redis servers defined\n");
+		return -1;
+	}
+
+	for(rsrv=_redisc_srv_list; rsrv; rsrv=rsrv->next)
+	{
+		addr = "127.0.0.1";
+		port = 6379;
+		for (pit = rsrv->attrs; pit; pit=pit->next)
+		{
+			if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) {
+				addr = pit->body.s;
+				addr[pit->body.len] = '\0';
+			} else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) {
+				if(!str2int(&pit->body, &port))
+					port = 6379;
+			}
+		}
+		rsrv->ctxRedis = redisConnect(addr, port);
+		if (rsrv->ctxRedis->err) {
+			LM_ERR("failed to connect to redis server [%.*s]: %s\n",
+					rsrv->sname->len, rsrv->sname->s, rsrv->ctxRedis->errstr);
+			return -1;
+		} else {
+			LM_DBG("connected to redis server [%.*s] (%s:%d)\n",
+					rsrv->sname->len, rsrv->sname->s, addr, port);
+		}
+	}
+	return 0;
+}
+
+/**
+ *
+ */
+int redisc_destroy(void)
+{
+	redisc_server_t *rsrv=NULL;
+	redisc_server_t *rsrv1=NULL;
+	if(_redisc_srv_list==NULL)
+		return -1;
+	rsrv=_redisc_srv_list;
+	while(rsrv!=NULL)
+	{
+		rsrv1 = rsrv;
+		rsrv=rsrv->next;
+		if(rsrv1->ctxRedis!=NULL)
+			redisFree(rsrv1->ctxRedis);
+		free_params(rsrv1->attrs);
+		pkg_free(rsrv1);
+	}
+	return 0;
+}
+
+/**
+ *
+ */
+int redisc_add_server(char *spec)
+{
+	param_t *pit=NULL;
+	param_hooks_t phooks;
+	redisc_server_t *rsrv=NULL;
+	str s;
+
+	s.s = spec;
+	s.len = strlen(spec);
+	if(s.s[s.len-1]==';')
+		s.len--;
+	if (parse_params(&s, CLASS_ANY, &phooks, &pit)<0)
+	{
+		LM_ERR("failed parsing params value\n");
+		goto error;
+	}
+	rsrv = (redisc_server_t*)pkg_malloc(sizeof(redisc_server_t));
+	if(rsrv==NULL)
+	{
+		LM_ERR("no more pkg\n");
+		goto error;
+	}
+	memset(rsrv, 0, sizeof(redisc_server_t));
+	rsrv->attrs = pit;
+	for (pit = rsrv->attrs; pit; pit=pit->next)
+	{
+		if(pit->name.len==4 && strncmp(pit->name.s, "name", 4)==0) {
+			rsrv->sname = &pit->body;
+			rsrv->hname = get_hash1_raw(rsrv->sname->s, rsrv->sname->len);
+			break;
+		}
+	}
+	if(rsrv->sname==NULL)
+	{
+		LM_ERR("no server name\n");
+		goto error;
+	}
+	rsrv->next = _redisc_srv_list;
+	_redisc_srv_list = rsrv;
+
+	return 0;
+error:
+	if(pit!=NULL)
+		free_params(pit);
+	if(rsrv!=NULL)
+		pkg_free(rsrv);
+	return -1;
+}
+
+/**
+ *
+ */
+redisc_server_t *redisc_get_server(str *name)
+{
+	redisc_server_t *rsrv=NULL;
+	unsigned int hname;
+
+	hname = get_hash1_raw(name->s, name->len);
+	rsrv=_redisc_srv_list;
+	while(rsrv!=NULL)
+	{
+		if(rsrv->hname==hname && rsrv->sname->len==name->len
+				&& strncmp(rsrv->sname->s, name->s, name->len)==0)
+			return rsrv;
+		rsrv=rsrv->next;
+	}
+	return NULL;
+}
+
+/**
+ *
+ */
+int redisc_exec(str *srv, str *cmd, str *argv1, str *argv2, str *argv3,
+		str *res)
+{
+	redisc_server_t *rsrv=NULL;
+	redisc_reply_t *rpl;
+	char c;
+
+	rsrv = redisc_get_server(srv);
+	if(srv==NULL || cmd==NULL || res==NULL)
+	{
+		LM_ERR("invalid parameters");
+		return -1;
+	}
+	if(rsrv==NULL)
+	{
+		LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
+		return -1;
+	}
+	if(rsrv->ctxRedis==NULL)
+	{
+		LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
+		return -1;
+	}
+	rpl = redisc_get_reply(res);
+	if(rpl==NULL)
+	{
+		LM_ERR("no redis reply id found: %.*s\n", res->len, res->s);
+		return -1;
+	}
+	c = cmd->s[cmd->len];
+	cmd->s[cmd->len] = '\0';
+	rpl->rplRedis = redisCommand(rsrv->ctxRedis, cmd->s);
+	cmd->s[cmd->len] = c;
+	return 0;
+}
+
+
+/**
+ *
+ */
+redisc_reply_t *redisc_get_reply(str *name)
+{
+	redisc_reply_t *rpl;
+	unsigned int hid;
+
+	hid = get_hash1_raw(name->s, name->len);
+
+	for(rpl=_redisc_rpl_list; rpl; rpl=rpl->next) {
+		if(rpl->hname==hid && rpl->rname.len==name->len
+				&& strncmp(rpl->rname.s, name->s, name->len)==0)
+			return rpl;
+	}
+	/* not found - add a new one */
+
+	rpl = (redisc_reply_t*)pkg_malloc(sizeof(redisc_reply_t));
+	if(rpl==NULL)
+	{
+		LM_ERR("no more pkg\n");
+		return NULL;
+	}
+	memset(rpl, 0, sizeof(redisc_reply_t));
+	rpl->hname = hid;
+	rpl->rname.s = (char*)pkg_malloc(name->len+1);
+	if(rpl->rname.s==NULL)
+	{
+		LM_ERR("no more pkg.\n");
+		pkg_free(rpl);
+		return NULL;
+	}
+	strncpy(rpl->rname.s, name->s, name->len);
+	rpl->rname.len = name->len;
+	rpl->rname.s[name->len] = '\0';
+	rpl->next = _redisc_rpl_list;
+	_redisc_rpl_list = rpl;
+	return rpl;
+}

+ 61 - 0
modules/ndb_redis/redis_client.h

@@ -0,0 +1,61 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2011 Daniel-Constantin Mierla (asipto.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#ifndef _REDIS_CLIENT_H_
+#define _REDIS_CLIENT_H_
+
+#include <hiredis/hiredis.h>
+
+#include "../../str.h"
+#include "../../parser/parse_param.h"
+
+int redisc_init(void);
+int redisc_destroy(void);
+int redisc_add_server(char *spec);
+int redisc_exec(str *srv, str *cmd, str *argv1, str *argv2, str *argv3,
+		str *res);
+
+typedef struct redisc_server {
+	str *sname;
+	unsigned int hname;
+	param_t *attrs;
+	redisContext *ctxRedis;
+	struct redisc_server *next;
+} redisc_server_t;
+
+typedef struct redisc_reply {
+	str rname;
+	unsigned int hname;
+	redisReply *rplRedis;
+	struct redisc_reply *next;
+} redisc_reply_t;
+
+typedef struct redisc_pv {
+	str rname;
+	redisc_reply_t *reply;
+	str rkey;
+	int rkeyid;
+} redisc_pv_t;
+
+redisc_reply_t *redisc_get_reply(str *name);
+#endif