Quellcode durchsuchen

db_redis: Implement db_redis generic db driver

This module implements a generic db driver for kamailio. It
requires a "schema" and "key" definition of "tables" and corresponding
keys for redis in the kamailio config file, otherwise it's supposed to
work with every module.

Implemented methods are query (w/o order-by), insert, update, delete.

Tested with usrloc and acc.
Andreas Granig vor 7 Jahren
Ursprung
Commit
53e746b5c5

+ 1 - 1
src/Makefile.groups

@@ -153,7 +153,7 @@ mod_list_jansson=jansson
 mod_list_jansson_event=janssonrpcc
 mod_list_jansson_event=janssonrpcc
 
 
 # - modules depending on redis library
 # - modules depending on redis library
-mod_list_redis=ndb_redis topos_redis
+mod_list_redis=db_redis ndb_redis topos_redis
 
 
 # - modules depending on mono library
 # - modules depending on mono library
 mod_list_mono=app_mono
 mod_list_mono=app_mono

+ 31 - 0
src/modules/db_redis/Makefile

@@ -0,0 +1,31 @@
+#
+# WARNING: do not run this directly, it should be run by the master Makefile
+
+include ../../Makefile.defs
+auto_gen=
+NAME=db_redis.so
+
+ifeq ($(CROSS_COMPILE),)
+HIREDIS_BUILDER = $(shell \
+	if pkg-config --exists hiredis; then \
+		echo 'pkg-config hiredis'; \
+	fi)
+endif
+
+ifeq ($(HIREDIS_BUILDER),)
+	HIREDISDEFS=-I$(LOCALBASE)/include
+	HIREDISLIBS=-L$(LOCALBASE)/lib -lhiredis
+else
+	HIREDISDEFS = $(shell $(HIREDIS_BUILDER) --cflags)
+	HIREDISLIBS = $(shell $(HIREDIS_BUILDER) --libs)
+endif
+
+DEFS+=$(HIREDISDEFS)
+LIBS=$(HIREDISLIBS)
+
+DEFS+=-DKAMAILIO_MOD_INTERFACE
+
+SERLIBPATH=../../lib
+SER_LIBS=$(SERLIBPATH)/srdb2/srdb2 $(SERLIBPATH)/srdb1/srdb1
+
+include ../../Makefile.modules

+ 199 - 0
src/modules/db_redis/README

@@ -0,0 +1,199 @@
+DB_REDIS Module
+
+Andreas Granig
+
+   <[email protected]>
+
+Edited by
+
+Andreas Granig
+
+   <[email protected]>
+
+   Copyright © 2018 sipwise.com
+     __________________________________________________________________
+
+   Table of Contents
+
+   1. Admin Guide
+
+        1. Overview
+
+              1.1. Limitations
+
+        2. Dependencies
+
+              2.1. Kamailio Modules
+              2.2. Parameters
+
+                    2.2.1. schema (string)
+                    2.2.2. keys (string)
+
+              2.3. External Libraries or Applications
+
+        3. Usage
+
+   List of Examples
+
+   1.1. Setting schema module parameter
+   1.2. Setting keys module parameter
+   1.3. Usage
+
+Chapter 1. Admin Guide
+
+   Table of Contents
+
+   1. Overview
+
+        1.1. Limitations
+
+   2. Dependencies
+
+        2.1. Kamailio Modules
+        2.2. Parameters
+
+              2.2.1. schema (string)
+              2.2.2. keys (string)
+
+        2.3. External Libraries or Applications
+
+   3. Usage
+
+1. Overview
+
+   1.1. Limitations
+
+   This module provides a DB APIv1 connector for Redis server.
+
+   It can be used as a replacement for other database modules such as
+   db_mysql and db_postgres. Not all the specs of DB APIv1 are
+   implemented, thus the usage of this module might be restricted to
+   specific modules. Also, for proper performance, the module needs
+   particular configuration tailored to the using modules.
+
+   Since Redis does not provide a schema, a schema has to be defined as
+   module parameter "schema". The schema definition is composed of a
+   semi-column separated list of table definitions in format
+   <table-name>=<column-name>/<type>[<column-name>/<type> ...].
+
+   Example:
+        version=table_name/string,table_version/int;location=username/string,dom
+ain/string,contact/string,received/string,path/string,expires/timestamp,q/double
+,callid/string,cseq/int,last_modified/timestamp,flags/int,cflags/int,user_agent/
+string,socket/string,methods/int,ruid/string,reg_id/int,instance/string,server_i
+d/int,connection_id/int,keepalive/int,partition/int
+
+   Also since Redis is a key-value store with keys having to be unique,
+   tables and rows e.g. from MySQL can not be ported 1:1 to Redis. For
+   instance, usrloc relies on a key "username@domain", but it must not be
+   unique for being able to store multiple contacts per AoR. Thus,
+   db_redis supports mapping sets in a way for example for usrloc to have
+   a set with a key "username@domain", with its entries being unique keys
+   per contact being the ruid of a contact. Thus, one contact in usrloc
+   consists of a unique key "location:entry::example-ruid-1" being a hash
+   with the columns like username, domain, contact, path etc. In addition,
+   this unique key is stored in a set
+   "location:usrdom::exampleuser:exampledomain.org". When usrloc does a
+   lookup based on "username@domain", db_redis figures out via the
+   keys/values the query is constructed by usrloc to look for the final
+   entry key in the mapping set first, then querying the actual entries
+   from there, avoiding full table scans. For usrloc, the same holds true
+   for exipired contacts, requiring a different kind of mapping. There is
+   a certain balance of read performance vs. write performance to
+   consider, because inserts and deletes also have to maintain the
+   mappings, in favor of much faster selects. The mappings can be freely
+   defined, so even though other kamailio modules don't require a specific
+   mapping to be in place for proper performance, mappings could be
+   defined for external applications to read faster (for instance letting
+   the acc module also write mappings besides the actual records for
+   billing systems to correlate start and stop records faster).
+
+   The mappings can be freely defined in the "keys" module parameter. It
+   is composed of a semi-colon separated list of definitions in format
+   <table-name>=<entry>:<column-name>[&<map-name>:<column-name>,<column-na
+   me>...]
+
+   Example:
+        version=entry:table_name;location=entry:ruid&usrdom:username,domain&time
+r:partition,keepalive;acc=entry:callid,time_hires&cid:callid
+
+   Note that as of now, you have to have version information in your Redis
+   db, similar to your MySQL schema. To insert table versions (e.g. for
+   usrloc and acc), execute the following:
+        # redis-cli -h $host -n $dbnumber HMSET version:entry::location table_ve
+rsion 8
+        # redis-cli -h $host -n $dbnumber HMSET version:entry::acc table_version
+ 5
+
+   You can read more about Redis at: https://www.redis.io.
+
+1.1. Limitations
+
+     * This module has implemented the equivalent operations for INSERT,
+       UPDATE, DELETE and SELECT. The ORDER BY for SELECT is not
+       implemented. Raw query is not implemented inside this module, use
+       db_redis for sending any kind of command to a Redis server.
+
+2. Dependencies
+
+   2.1. Kamailio Modules
+   2.2. Parameters
+
+        2.2.1. schema (string)
+        2.2.2. keys (string)
+
+   2.3. External Libraries or Applications
+
+2.1. Kamailio Modules
+
+   The following modules must be loaded before this module:
+     * none.
+
+2.2. Parameters
+
+2.2.1. schema (string)
+
+   The schema of your tables.
+
+   Example 1.1. Setting schema module parameter
+modparam("db_redis", "schema", "version=table_name/string,table_version/int;loca
+tion=username/string,domain/string,contact/string,received/string,path/string,ex
+pires/timestamp,q/double,callid/string,cseq/int,last_modified/timestamp,flags/in
+t,cflags/int,user_agent/string,socket/string,methods/int,ruid/string,reg_id/int,
+instance/string,server_id/int,connection_id/int,keepalive/int,partition/int")
+
+2.2.2. keys (string)
+
+   The lookup and mapping keys of your tables.
+
+   Example 1.2. Setting keys module parameter
+modparam("db_redis", "keys", "version=entry:table_name;location=entry:ruid&usrdo
+m:username,domain&timer:partition,keepalive")
+
+2.3. External Libraries or Applications
+
+   The following libraries or applications must be installed before
+   running Kamailio with this module loaded:
+     * hiredis - available at https://github.com/redis/hiredis
+
+3. Usage
+
+   Load the module and set the the DB URL for specific modules to:
+   redis://[username]@host:port/database. Username is optional. Database
+   must be a valid redis database number.
+
+   Example 1.3. Usage
+...
+loadmodule "db_redis.so"
+...
+#!define DBURL "redis://127.0.0.1:6379/5"
+...
+modparam("db_redis", "schema", "version=table_name/string,table_version/int;loca
+tion=username/string,domain/string,contact/string,received/string,path/string,ex
+pires/timestamp,q/double,callid/string,cseq/int,last_modified/timestamp,flags/in
+t,cflags/int,user_agent/string,socket/string,methods/int,ruid/string,reg_id/int,
+instance/string,server_id/int,connection_id/int,keepalive/int,partition/int")
+modparam("db_redis", "keys", "version=entry:table_name;location=entry:ruid&usrdo
+m:username,domain&timer:partition,keepalive")
+modparam("usrloc", "db_url", DBURL)
+...

+ 105 - 0
src/modules/db_redis/db_redis_mod.c

@@ -0,0 +1,105 @@
+/**
+ * Copyright (C) 2018 Andreas Granig (sipwise.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ */
+
+#define DB_REDIS_DEBUG
+
+#include <string.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#include "db_redis_mod.h"
+#include "redis_dbase.h"
+
+MODULE_VERSION
+
+str redis_keys = str_init("");
+str redis_schema = str_init("");
+
+static int db_redis_bind_api(db_func_t *dbb);
+static int mod_init(void);
+static void mod_destroy(void);
+
+static cmd_export_t cmds[] = {
+    {"db_bind_api",    (cmd_function)db_redis_bind_api,    0, 0, 0, 0},
+    {0, 0, 0, 0, 0, 0}
+};
+
+
+/*
+ * Exported parameters
+ */
+static param_export_t params[] = {
+    {"keys", PARAM_STR, &redis_keys },
+    {"schema", PARAM_STR, &redis_schema },
+    {0, 0, 0}
+};
+
+
+struct module_exports exports = {
+    "db_redis",
+    DEFAULT_DLFLAGS, /* dlopen flags */
+    cmds,
+    params,             /*  module parameters */
+    0,                  /* exported statistics */
+    0,                  /* exported MI functions */
+    0,                  /* exported pseudo-variables */
+    0,                  /* extra processes */
+    mod_init,           /* module initialization function */
+    0,                  /* response function*/
+    mod_destroy,        /* destroy function */
+    0                   /* per-child init function */
+};
+
+static int db_redis_bind_api(db_func_t *dbb) {
+    if(dbb==NULL)
+        return -1;
+
+    memset(dbb, 0, sizeof(db_func_t));
+
+    dbb->use_table        = db_redis_use_table;
+    dbb->init             = db_redis_init;
+    dbb->close            = db_redis_close;
+    dbb->query            = db_redis_query;
+    dbb->fetch_result     = 0; //db_redis_fetch_result;
+    dbb->raw_query        = 0; //db_redis_raw_query;
+    dbb->free_result      = db_redis_free_result;
+    dbb->insert           = db_redis_insert;
+    dbb->delete           = db_redis_delete;
+    dbb->update           = db_redis_update;
+    dbb->replace          = 0; //db_redis_replace;
+
+    return 0;
+}
+
+int mod_register(char *path, int *dlflags, void *p1, void *p2) {
+    if(db_api_init()<0)
+        return -1;
+    return 0;
+}
+
+static int mod_init(void) {
+    LM_DBG("module initializing\n");
+    return 0;
+}
+
+static void mod_destroy(void) {
+    LM_DBG("module destroying\n");
+}

+ 53 - 0
src/modules/db_redis/db_redis_mod.h

@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2018 Andreas Granig (sipwise.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ */
+
+
+#ifndef _DB_REDIS_MOD_H
+#define _DB_REDIS_MOD_H
+
+#include "../../lib/srdb1/db.h"
+#include "../../lib/srdb1/db_ut.h"
+#include "../../lib/srdb1/db_query.h"
+#include "../../lib/srdb1/db_pool.h"
+#include "../../lib/srdb1/db_id.h"
+#include "../../lib/srdb1/db_con.h"
+#include "../../lib/srdb1/db_res.h"
+#include "../../lib/srdb1/db_key.h"
+#include "../../lib/srdb1/db_op.h"
+#include "../../lib/srdb1/db_val.h"
+
+#include "../../core/mem/mem.h"
+
+#include "../../core/dprint.h"
+#include "../../core/sr_module.h"
+#include "../../core/str.h"
+#include "../../core/str_hash.h"
+#include "../../core/ut.h"
+
+#define REDIS_DIRECT_PREFIX "entry"
+#define REDIS_DIRECT_PREFIX_LEN 5
+
+#define REDIS_HT_SIZE 8
+
+extern str redis_keys;
+extern str redis_schema;
+
+#endif /* _DB_REDIS_MOD_H */

+ 4 - 0
src/modules/db_redis/doc/Makefile

@@ -0,0 +1,4 @@
+docs = db_redis.xml
+
+docbook_dir = ../../../docbook
+include $(docbook_dir)/Makefile.module

+ 37 - 0
src/modules/db_redis/doc/db_redis.xml

@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+
+<book xmlns:xi="http://www.w3.org/2001/XInclude">
+    <bookinfo>
+	<title>DB_REDIS Module</title>
+	<productname class="trade">kamailio.org</productname>
+	<authorgroup>
+	    <author>
+		<firstname>Andreas</firstname>
+		<surname>Granig</surname>
+		<email>[email protected]</email>
+	    </author>
+	    <editor>
+		<firstname>Andreas</firstname>
+		<surname>Granig</surname>
+		<email>[email protected]</email>
+	    </editor>
+	</authorgroup>
+	<copyright>
+	    <year>2018</year>
+	    <holder>sipwise.com</holder>
+	</copyright>
+    </bookinfo>
+    <toc></toc>
+    
+    <xi:include href="db_redis_admin.xml"/>
+    
+    
+</book>

+ 190 - 0
src/modules/db_redis/doc/db_redis_admin.xml

@@ -0,0 +1,190 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+<!-- Module User's Guide -->
+
+<chapter>
+	
+	<title>&adminguide;</title>
+	
+	<section>
+	<title>Overview</title>
+	<para>
+		This module provides a DB APIv1 connector for Redis server.
+	</para>
+	<para>
+	It can be used as a replacement for other database modules such as
+	db_mysql and db_postgres. Not all the specs of DB APIv1 are
+	implemented, thus the usage of this module might be restricted to
+	specific modules. Also, for proper performance, the module needs
+	particular configuration tailored to the using modules.
+	</para>
+	<para>
+	Since Redis does not provide a schema, a schema has to be defined as
+	module parameter "schema". The schema definition is composed of a
+	semi-column	separated list of table definitions in format
+	&lt;table-name&gt;=&lt;column-name&gt;/&lt;type&gt;[,&lt;column-name&gt;/&lt;type&gt; ...].
+	</para>
+	<para>
+	Example:
+	<programlisting format="linespecific">
+	version=table_name/string,table_version/int;location=username/string,domain/string,contact/string,received/string,path/string,expires/timestamp,q/double,callid/string,cseq/int,last_modified/timestamp,flags/int,cflags/int,user_agent/string,socket/string,methods/int,ruid/string,reg_id/int,instance/string,server_id/int,connection_id/int,keepalive/int,partition/int
+	</programlisting>
+	</para>
+	<para>
+	Also since Redis is a key-value store with keys having to be unique,
+	tables and rows e.g. from MySQL can not be ported 1:1 to Redis. For
+	instance, usrloc relies on a key "username@domain", but it must not be
+	unique for being able to store multiple contacts per AoR. Thus, db_redis
+	supports mapping sets in a way for example for usrloc to have a set with
+	a key "username@domain", with its entries being unique keys per contact being
+	the ruid of a contact. Thus, one contact in usrloc consists of a unique
+	key "location:entry::example-ruid-1" being a hash with the columns like
+	username, domain, contact, path etc. In addition, this unique key is stored
+	in a set "location:usrdom::exampleuser:exampledomain.org". When usrloc does
+	a lookup based on "username@domain", db_redis figures out via the keys/values
+	the query is constructed by usrloc to look for the final entry key in the
+	mapping set first, then querying the actual entries from there, avoiding full
+	table scans. For usrloc, the same holds true for expired contacts, requiring
+	a different kind of mapping. There is a certain balance of read performance
+	vs. write performance to consider, because inserts and deletes also have to
+	maintain the mappings, in favor of much faster selects. The mappings can be
+	freely defined, so even though other kamailio modules don't require a specific
+	mapping to be in place for proper performance, mappings could be defined
+	for external applications to read faster (for instance letting the acc module
+	also write mappings besides the actual records for billing systems to
+	correlate start and stop records faster).
+	</para>
+	<para>
+	The mappings can be freely defined in the "keys" module parameter. It is
+	composed of a semi-colon separated list of definitions in format
+	&lt;table-name&gt;=&lt;entry&gt;:&lt;column-name&gt;[&amp;&lt;map-name&gt;:&lt;column-name&gt;,&lt;column-name&gt;...].
+	Each table must at least have an "entry" key for db_redis to be able to store data.
+	</para>
+	<para>
+	Example:
+	<programlisting format="linespecific">
+	version=entry:table_name;location=entry:ruid&amp;usrdom:username,domain&amp;timer:partition,keepalive;acc=entry:callid,time_hires&amp;cid:callid
+	</programlisting>
+	</para>
+	<para>
+	Note that as of now, you have to have version information in your Redis db,
+	similar to your MySQL schema. To insert table versions (e.g. for usrloc and acc),
+	execute the following:
+	</para>
+	<programlisting format="linespecific">
+	# redis-cli -h $host -n $dbnumber HMSET version:entry::location table_version 8
+	# redis-cli -h $host -n $dbnumber HMSET version:entry::acc table_version 5
+	</programlisting>
+
+	<para>
+		You can read more about Redis at:
+		<ulink url="https://www.redis.io">https://www.redis.io</ulink>.
+	</para>
+
+	<section>
+	<title>Limitations</title>
+	<itemizedlist>
+	<listitem>
+	<para>
+		This module has implemented the equivalent operations for INSERT,
+		UPDATE, DELETE and SELECT. The ORDER BY for SELECT is not implemented.
+		Raw query is not implemented inside this module, use ndb_redis for sending any
+		kind of command to a Redis server.
+	</para>
+	</listitem>
+	</itemizedlist>
+	</section>
+	</section>
+
+	<section>
+	<title>Dependencies</title>
+	<section>
+		<title>&kamailio; Modules</title>
+		<para>
+		The following modules must be loaded before this module:
+			<itemizedlist>
+			<listitem>
+			<para>
+				<emphasis>none</emphasis>.
+			</para>
+			</listitem>
+			</itemizedlist>
+		</para>
+	</section>
+
+	<section>
+	<title>Parameters</title>
+	<section>
+		<title><varname>schema</varname> (string)</title>
+		<para>
+		The schema of your tables.
+		</para>
+		<example>
+		<title>Setting schema module parameter</title>
+		<programlisting format="linespecific">
+modparam("db_redis", "schema", "version=table_name/string,table_version/int;location=username/string,domain/string,contact/string,received/string,path/string,expires/timestamp,q/double,callid/string,cseq/int,last_modified/timestamp,flags/int,cflags/int,user_agent/string,socket/string,methods/int,ruid/string,reg_id/int,instance/string,server_id/int,connection_id/int,keepalive/int,partition/int")
+		</programlisting>
+		</example>
+	</section>
+	<section>
+		<title><varname>keys</varname> (string)</title>
+		<para>
+		The entry and mapping keys of your tables.
+		</para>
+		<example>
+		<title>Setting keys module parameter</title>
+		<programlisting format="linespecific">
+modparam("db_redis", "keys", "version=entry:table_name;location=entry:ruid&amp;usrdom:username,domain&amp;timer:partition,keepalive")
+		</programlisting>
+		</example>
+	</section>
+	</section>
+
+	<section>
+		<title>External Libraries or Applications</title>
+		<para>
+		The following libraries or applications must be installed before running
+		&kamailio; with this module loaded:
+			<itemizedlist>
+			<listitem>
+			<para>
+				<emphasis>hiredis</emphasis> - available at
+				<ulink url="https://github.com/redis/hiredis">https://github.com/redis/hiredis</ulink>
+			</para>
+			</listitem>
+			</itemizedlist>
+		</para>
+	</section>
+	</section>
+
+	<section>
+	<title>Usage</title>
+		<para>
+			Load the module and set the the DB URL for specific modules to:
+			redis://[username]@host:port/database. Username is optional.
+			Database must be a valid redis database number.
+		</para>
+		<example>
+		<title>Usage</title>
+		<programlisting format="linespecific">
+...
+loadmodule "db_redis.so"
+...
+#!define DBURL "redis://127.0.0.1:6379/5"
+...
+modparam("db_redis", "schema", "version=table_name/string,table_version/int;location=username/string,domain/string,contact/string,received/string,path/string,expires/timestamp,q/double,callid/string,cseq/int,last_modified/timestamp,flags/int,cflags/int,user_agent/string,socket/string,methods/int,ruid/string,reg_id/int,instance/string,server_id/int,connection_id/int,keepalive/int,partition/int")
+modparam("db_redis", "keys", "version=entry:table_name;location=entry:ruid&amp;usrdom:username,domain&amp;timer:partition,keepalive")
+modparam("usrloc", "db_url", DBURL)
+...
+</programlisting>
+		</example>
+	</section>
+</chapter>
+

+ 293 - 0
src/modules/db_redis/redis_connection.c

@@ -0,0 +1,293 @@
+/*
+ * Copyright (C) 2018 Andreas Granig (sipwise.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+ */
+
+#include <stdlib.h>
+
+#include "db_redis_mod.h"
+#include "redis_connection.h"
+#include "redis_table.h"
+
+int db_redis_connect(km_redis_con_t *con) {
+    struct timeval tv;
+    redisReply *reply;
+    int db;
+
+    tv.tv_sec = 1;
+    tv.tv_usec = 0;
+
+    db = atoi(con->id->database);
+    reply = NULL;
+
+    // TODO: introduce require_master mod-param and check if we're indeed master
+    // TODO: on carrier, if we have db fail-over, the currently connected
+    // redis server will become slave without dropping connections?
+
+    con->con = redisConnectWithTimeout(con->id->host, con->id->port, tv);
+
+    if (!con->con) {
+        LM_ERR("cannot open connection: %.*s\n", con->id->url.len, con->id->url.s);
+        goto err;
+    }
+    if (con->con->err) {
+        LM_ERR("cannot open connection to %.*s: %s\n", con->id->url.len, con->id->url.s,
+            con->con->errstr);
+        goto err;
+    }
+
+    if (con->id->password) {
+        reply = redisCommand(con->con, "AUTH %s", con->id->password);
+        if (!reply) {
+            LM_ERR("cannot authenticate connection %.*s: %s\n",
+                    con->id->url.len, con->id->url.s, con->con->errstr);
+            goto err;
+        }
+        if (reply->type == REDIS_REPLY_ERROR) {
+            LM_ERR("cannot authenticate connection %.*s: %s\n",
+                    con->id->url.len, con->id->url.s, reply->str);
+            goto err;
+        }
+        freeReplyObject(reply); reply = NULL;
+    }
+
+    reply = redisCommand(con->con, "PING");
+    if (!reply) {
+        LM_ERR("cannot ping server on connection %.*s: %s\n",
+                con->id->url.len, con->id->url.s, con->con->errstr);
+        goto err;
+    }
+    if (reply->type == REDIS_REPLY_ERROR) {
+        LM_ERR("cannot ping server on connection %.*s: %s\n",
+                con->id->url.len, con->id->url.s, reply->str);
+        goto err;
+    }
+    freeReplyObject(reply); reply = NULL;
+
+    reply = redisCommand(con->con, "SELECT %i", db);
+    if (!reply) {
+        LM_ERR("cannot select db on connection %.*s: %s\n",
+                con->id->url.len, con->id->url.s, con->con->errstr);
+        goto err;
+    }
+    if (reply->type == REDIS_REPLY_ERROR) {
+        LM_ERR("cannot select db on connection %.*s: %s\n",
+                con->id->url.len, con->id->url.s, reply->str);
+        goto err;
+    }
+    freeReplyObject(reply); reply = NULL;
+
+    return 0;
+
+err:
+    if (reply)
+        freeReplyObject(reply);
+    if (con->con) {
+        redisFree(con->con);
+        con->con = NULL;
+    }
+    return -1;
+}
+
+/*! \brief
+ * Create a new connection structure,
+ * open the redis connection and set reference count to 1
+ */
+km_redis_con_t* db_redis_new_connection(const struct db_id* id) {
+    km_redis_con_t *ptr = NULL;
+
+    if (!id) {
+        LM_ERR("invalid id parameter value\n");
+        return 0;
+    }
+
+    ptr = (km_redis_con_t*)pkg_malloc(sizeof(km_redis_con_t));
+    if (!ptr) {
+        LM_ERR("no private memory left\n");
+        return 0;
+    }
+    memset(ptr, 0, sizeof(km_redis_con_t));
+    ptr->id = (struct db_id*)id;
+
+    /*
+    LM_DBG("trying to initialize connection to '%.*s' with schema '%.*s' and keys '%.*s'\n",
+            id->url.len, id->url.s,
+            redis_schema.len, redis_schema.s,
+            redis_keys.len, redis_keys.s);
+    */
+    LM_DBG("trying to initialize connection to '%.*s'\n",
+            id->url.len, id->url.s);
+    if (db_redis_parse_schema(ptr) != 0) {
+        LM_ERR("failed to parse 'schema' module parameter\n");
+        goto err;
+    }
+    if (db_redis_parse_keys(ptr) != 0) {
+        LM_ERR("failed to parse 'keys' module parameter\n");
+        goto err;
+    }
+
+    db_redis_print_all_tables(ptr);
+
+    ptr->ref = 1;
+    ptr->append_counter = 0;
+
+    if (db_redis_connect(ptr) != 0) {
+        LM_ERR("Failed to connect to redis db\n");
+        goto err;
+    }
+
+    LM_DBG("connection opened to %.*s\n", id->url.len, id->url.s);
+
+    return ptr;
+
+ err:
+    if (ptr) {
+        if (ptr->con) {
+            redisFree(ptr->con);
+        }
+        pkg_free(ptr);
+    }
+    return 0;
+}
+
+
+/*! \brief
+ * Close the connection and release memory
+ */
+void db_redis_free_connection(struct pool_con* con) {
+    km_redis_con_t * _c;
+
+    LM_DBG("freeing db_redis connection\n");
+
+    if (!con) return;
+
+    _c = (km_redis_con_t*) con;
+
+    if (_c->id) free_db_id(_c->id);
+    if (_c->con) {
+        redisFree(_c->con);
+    }
+
+    db_redis_free_tables(_c);
+    pkg_free(_c);
+}
+
+
+static void print_query(redis_key_t *query) {
+    LM_DBG("Query dump:\n");
+    for (redis_key_t *k = query; k; k = k->next) {
+        LM_DBG("  %s\n", k->key.s);
+    }
+}
+
+void *db_redis_command_argv(km_redis_con_t *con, redis_key_t *query) {
+    char **argv = NULL;
+    int argc;
+
+    print_query(query);
+
+    argc = db_redis_key_list2arr(query, &argv);
+    if (argc < 0) {
+        LM_ERR("Failed to allocate memory for query array\n");
+        return NULL;
+    }
+    LM_DBG("query has %d args\n", argc);
+
+    redisReply *reply = redisCommandArgv(con->con, argc, (const char**)argv, NULL);
+    if (con->con->err == REDIS_ERR_EOF &&
+        strcmp(con->con->errstr,"Server closed the connection") == 0) {
+
+        if (db_redis_connect(con) != 0) {
+            LM_ERR("Failed to reconnect to redis db\n");
+            pkg_free(argv);
+            return NULL;
+        }
+        reply = redisCommandArgv(con->con, argc, (const char**)argv, NULL);
+    }
+    pkg_free(argv);
+    return reply;
+}
+
+int db_redis_append_command_argv(km_redis_con_t *con, redis_key_t *query) {
+    char **argv = NULL;
+    int ret, argc;
+
+    print_query(query);
+
+    argc = db_redis_key_list2arr(query, &argv);
+    if (argc < 0) {
+        LM_ERR("Failed to allocate memory for query array\n");
+        return -1;
+    }
+    LM_DBG("query has %d args\n", argc);
+
+    ret = redisAppendCommandArgv(con->con, argc, (const char**)argv, NULL);
+    if (con->con->err == REDIS_ERR_EOF &&
+        strcmp(con->con->errstr,"Server closed the connection") == 0) {
+
+        if (db_redis_connect(con) != 0) {
+            LM_ERR("Failed to reconnect to redis db\n");
+            pkg_free(argv);
+            return ret;
+        }
+        ret = redisAppendCommandArgv(con->con, argc, (const char**)argv, NULL);
+    }
+    pkg_free(argv);
+    if (!con->con->err) {
+        con->append_counter++;
+    }
+    return ret;
+}
+
+int db_redis_get_reply(km_redis_con_t *con, void **reply) {
+    int ret;
+
+    *reply = NULL;
+    ret = redisGetReply(con->con, reply);
+    if (con->con->err == REDIS_ERR_EOF &&
+        strcmp(con->con->errstr,"Server closed the connection") == 0) {
+
+        if (db_redis_connect(con) != 0) {
+            LM_ERR("Failed to reconnect to redis db\n");
+            return ret;
+        }
+        ret = redisGetReply(con->con, reply);
+    }
+    if (!con->con->err)
+        con->append_counter--;
+    return ret;
+}
+
+void db_redis_free_reply(redisReply **reply) {
+    if (reply && *reply) {
+        freeReplyObject(*reply);
+        *reply = NULL;
+    }
+}
+
+void db_redis_consume_replies(km_redis_con_t *con) {
+    redisReply *reply = NULL;
+    while (con->append_counter > 0 && !con->con->err) {
+        LM_DBG("consuming outstanding reply %u", con->append_counter);
+        db_redis_get_reply(con, (void**)&reply);
+        if (reply) {
+            freeReplyObject(reply);
+            reply = NULL;
+        }
+    }
+}

+ 73 - 0
src/modules/db_redis/redis_connection.h

@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2018 Andreas Granig (sipwise.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ */
+
+
+#ifndef _REDIS_CONNECTION_H_
+#define _REDIS_CONNECTION_H_
+
+#include <hiredis.h>
+
+#include "db_redis_mod.h"
+
+#define db_redis_check_reply(con, reply, err) do { \
+    if (!(reply) && !(con)->con) { \
+        LM_ERR("Failed to fetch type entry: no connection to server\n"); \
+        goto err; \
+    } \
+    if (!(reply)) { \
+        LM_ERR("Failed to fetch type entry: %s\n", \
+                (con)->con->errstr); \
+        goto err; \
+    } \
+    if ((reply)->type == REDIS_REPLY_ERROR) { \
+        LM_ERR("Failed to fetch type entry: %s\n", \
+                (reply)->str); \
+        goto err; \
+    } \
+} while(0);
+
+typedef struct km_redis_con {
+    struct db_id* id;
+    unsigned int ref;
+    struct pool_con* next;
+
+    redisContext *con;
+    unsigned int append_counter;
+    struct str_hash_table tables;
+} km_redis_con_t;
+
+
+struct redis_key;
+typedef struct redis_key redis_key_t;
+
+#define REDIS_CON(db_con)  ((km_redis_con_t*)((db_con)->tail))
+
+km_redis_con_t* db_redis_new_connection(const struct db_id* id);
+void db_redis_free_connection(struct pool_con* con);
+
+int db_redis_connect(km_redis_con_t *con);
+void *db_redis_command_argv(km_redis_con_t *con, redis_key_t *query);
+int db_redis_append_command_argv(km_redis_con_t *con, redis_key_t *query);
+int db_redis_get_reply(km_redis_con_t *con, void **reply);
+void db_redis_consume_replies(km_redis_con_t *con);
+void db_redis_free_reply(redisReply **reply);
+
+#endif /* _REDIS_CONNECTION_H_ */

+ 2058 - 0
src/modules/db_redis/redis_dbase.c

@@ -0,0 +1,2058 @@
+/*
+ * Copyright (C) 2018 Andreas Granig (sipwise.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ */
+
+#include <stdlib.h>
+#include <time.h>
+
+#include "db_redis_mod.h"
+#include "redis_connection.h"
+#include "redis_dbase.h"
+#include "redis_table.h"
+
+static void db_redis_dump_reply(redisReply *reply) {
+    if (reply->type == REDIS_REPLY_STRING) {
+        LM_DBG("%s\n", reply->str);
+    } else if (reply->type == REDIS_REPLY_INTEGER) {
+        LM_DBG("%lld\n", reply->integer);
+    } else if (reply->type == REDIS_REPLY_NIL) {
+        LM_DBG("<null>\n");
+    } else if (reply->type == REDIS_REPLY_ARRAY) {
+        LM_DBG("printing %lu elements in array reply\n", reply->elements);
+        for(int i = 0; i < reply->elements; ++i) {
+            db_redis_dump_reply(reply->element[i]);
+        }
+    } else {
+        LM_DBG("not printing invalid reply type\n");
+    }
+}
+
+// TODO: utilize auto-expiry? on insert/update, also update expire value
+// of mappings
+
+/*
+ * Initialize database module
+ * No function should be called before this
+ */
+db1_con_t* db_redis_init(const str* _url) {
+    return db_do_init(_url, (void *)db_redis_new_connection);
+}
+
+/*
+ * Shut down database module
+ * No function should be called after this
+ */
+void db_redis_close(db1_con_t* _h) {
+    LM_DBG("closing redis db connection\n");
+    db_do_close(_h, db_redis_free_connection);
+}
+
+static int db_redis_val2str(const db_val_t *v, str *_str) {
+    const char *s;
+    const str *tmpstr;
+    int vtype = VAL_TYPE(v);
+    _str->s = NULL;
+    _str->len = 32; // default for numbers
+
+    if (VAL_NULL(v)) {
+        LM_DBG("converting <null> value to str\n");
+        _str->len = 0;
+        return 0;
+    }
+
+    switch (vtype) {
+        case DB1_INT:
+            LM_DBG("converting int value %d to str\n", VAL_INT(v));
+            _str->s = (char*)pkg_malloc(_str->len);
+            if (!_str->s) goto memerr;
+            snprintf(_str->s, _str->len, "%d", VAL_INT(v));
+            _str->len = strlen(_str->s);
+            break;
+        case DB1_BIGINT:
+            LM_DBG("converting bigint value %lld to str\n", VAL_BIGINT(v));
+            _str->s = (char*)pkg_malloc(_str->len);
+            if (!_str->s) goto memerr;
+            snprintf(_str->s, _str->len, "%lld", VAL_BIGINT(v));
+            _str->len = strlen(_str->s);
+            break;
+        case DB1_STRING:
+            s = VAL_STRING(v);
+            _str->len = strlen(s);
+            LM_DBG("converting string value '%s' with len %d to str\n", s, _str->len);
+            _str->s = (char*)pkg_malloc(_str->len + 1);
+            if (!_str->s) goto memerr;
+            //memcpy(_str->s, s, _str->len);
+            //_str->s[_str->len] = '\0';
+            memset(_str->s, 0, _str->len + 1);
+            strncpy(_str->s, s, _str->len);
+            break;
+        case DB1_STR:
+            tmpstr = &(VAL_STR(v));
+            LM_DBG("converting str value '%.*s' with len %d to str\n", tmpstr->len, tmpstr->s, tmpstr->len);
+            // copy manually to add 0 termination
+            _str->s = (char*)pkg_malloc(tmpstr->len + 1);
+            if (!_str->s) goto memerr;
+            _str->len = tmpstr->len;
+            memcpy(_str->s, tmpstr->s, _str->len);
+            _str->s[_str->len] = '\0';
+            break;
+        case DB1_DATETIME:
+            LM_DBG("converting datetime value %ld to str\n", VAL_TIME(v));
+            _str->s = (char*)pkg_malloc(_str->len);
+            if (!_str->s) goto memerr;
+            strftime(_str->s, _str->len, "%Y-%m-%d %H:%M:%S", localtime(&(VAL_TIME(v))));
+            _str->len = strlen(_str->s);
+            break;
+        case DB1_DOUBLE:
+            LM_DBG("converting double value %f to str\n", VAL_DOUBLE(v));
+            _str->s = (char*)pkg_malloc(_str->len);
+            if (!_str->s) goto memerr;
+            snprintf(_str->s, _str->len, "%.6f", VAL_DOUBLE(v));
+            _str->len = strlen(_str->s);
+            break;
+        case DB1_BITMAP:
+            LM_DBG("converting bitmap value %u to str\n", VAL_BITMAP(v));
+            _str->s = (char*)pkg_malloc(_str->len);
+            if (!_str->s) goto memerr;
+            snprintf(_str->s, _str->len, "%u", VAL_BITMAP(v));
+            _str->len = strlen(_str->s);
+            break;
+        case DB1_BLOB:
+        default:
+            LM_ERR("Unsupported val type %d\n", vtype);
+            goto err;
+    }
+
+    return 0;
+
+memerr:
+    LM_ERR("Failed to allocate memory to convert value to string\n");
+err:
+    return -1;
+}
+
+static int db_redis_build_entry_manual_keys(redis_table_t *table, const db_key_t *_k, const db_val_t *_v, const int _n, int **manual_keys, int *manual_key_count) {
+
+    // TODO: we also put keys here which are already part of type mapping!
+    // there must be removed for performance reasons
+
+    redis_key_t *key = NULL;
+
+    *manual_keys = (int*)pkg_malloc(_n * sizeof(int));
+    if (! *manual_keys) {
+        LM_ERR("Failed to allocate memory for manual key indices\n");
+        goto err;
+    }
+    memset(*manual_keys, 0, _n * sizeof(int));
+    *manual_key_count = 0;
+
+    for (key = table->entry_keys; key; key = key->next) {
+        int subkey_found = 0;
+        LM_DBG("checking for existence of entry key '%.*s' in query to get manual key\n",
+                key->key.len, key->key.s);
+        for (int i = 0; i < _n; ++i) {
+            const db_key_t k = _k[i];
+            if (!str_strcmp(&key->key, (str*)k)) {
+                LM_DBG("found key in entry key\n");
+                subkey_found = 1;
+                break;
+            } else {
+                (*manual_keys)[*manual_key_count] = i;
+                (*manual_key_count)++;
+            }
+        }
+        if (!subkey_found) {
+            break;
+        }
+    }
+    return 0;
+
+err:
+    if (*manual_keys) {
+        pkg_free(*manual_keys);
+        *manual_keys = NULL;
+    }
+    return -1;
+}
+
+static int db_redis_find_query_key(redis_key_t *key, const str *table_name, str *type_name, const db_key_t *_k, const db_val_t *_v, const int _n, str *key_name, int *key_found) {
+
+    unsigned int len;
+    str val = {NULL, 0};
+
+    *key_found = 1;
+    key_name->len = 0;
+    key_name->s = NULL;
+
+    for (; key; key = key->next) {
+        int subkey_found = 0;
+        LM_DBG("checking for existence of entry key '%.*s' in query\n",
+                key->key.len, key->key.s);
+        for (int i = 0; i < _n; ++i) {
+            const db_key_t k = _k[i];
+            const db_val_t v = _v[i];
+
+            if (VAL_NULL(&v)) {
+                LM_DBG("Skipping null value for given key '%.*s'\n",
+                        k->len, k->s);
+                break;
+            } else if (!str_strcmp(&key->key, (str*)k)) {
+                LM_DBG("found key in entry key\n");
+                if (db_redis_val2str(&v, &val) != 0) goto err;
+                if (val.s == NULL) {
+                    LM_DBG("key value in entry key is null, skip key\n");
+                    subkey_found = 0;
+                    break;
+                }
+                if (!key_name->len) {
+                    // <table_name>:<type>::<val>
+                    len = table_name->len + 1 + type_name->len + 2 + val.len + 1; //snprintf writes term 0 char
+                    key_name->s = (char*)pkg_malloc(len);
+                    if (!key_name->s) {
+                        LM_ERR("Failed to allocate key memory\n");
+                        goto err;
+                    }
+                    snprintf(key_name->s, len, "%.*s:%.*s::%.*s",
+                            table_name->len, table_name->s,
+                            type_name->len, type_name->s,
+                            val.len, val.s);
+                    key_name->len = len-1; // subtract the term 0 char
+
+                } else {
+                    // :<val>
+                    key_name->s = (char*)pkg_realloc(key_name->s, key_name->len + val.len + 2);
+                    if (!key_name->s) {
+                        LM_ERR("Failed to allocate key memory\n");
+                        goto err;
+                    }
+                    snprintf(key_name->s + key_name->len, 1 + val.len + 1, ":%.*s",
+                            val.len, val.s);
+                    key_name->len += (1 + val.len);
+                }
+                LM_DBG("entry key so far is '%.*s'\n", key_name->len, key_name->s);
+                subkey_found = 1;
+                pkg_free(val.s);
+                val.s = NULL;
+                break;
+            }
+        }
+        if (!subkey_found) {
+            LM_DBG("key '%.*s' for type '%.*s' not found, unable to use this type\n",
+                    key->key.len, key->key.s, type_name->len, type_name->s);
+            if (key_name->s) {
+                pkg_free(key_name->s);
+                key_name->s = NULL;
+                key_name->len = 0;
+            }
+            *key_found = 0;
+            break;
+        }
+    }
+
+    return 0;
+
+err:
+    if (val.s)
+        pkg_free(val.s);
+    if(key_name->s) {
+        pkg_free(key_name->s);
+        key_name->s = NULL;
+        key_name->len = 0;
+    }
+    return -1;
+}
+
+static int db_redis_build_entry_keys(km_redis_con_t *con, const str *table_name,
+        const db_key_t *_k, const db_val_t *_v, const int _n,
+        redis_key_t **keys, int *keys_count) {
+
+    struct str_hash_entry *table_e;
+    redis_table_t *table;
+    redis_key_t *key;
+    int key_found;
+    str type_name = str_init("entry");
+    str keyname = {NULL, 0};
+
+    LM_DBG("build entry keys\n");
+
+    table_e = str_hash_get(&con->tables, table_name->s, table_name->len);
+    if (!table_e) {
+        LM_ERR("query to undefined table '%.*s', define in db_redis keys parameter!",
+                table_name->len, table_name->s);
+        return -1;
+    }
+    table = (redis_table_t*)table_e->u.p;
+    key = table->entry_keys;
+    if (db_redis_find_query_key(key, table_name, &type_name, _k, _v, _n, &keyname, &key_found) != 0) {
+        goto err;
+    }
+    if (key_found) {
+        db_redis_key_add_str(keys, &keyname);
+
+        if (db_redis_key_add_str(keys, &keyname) != 0) {
+            LM_ERR("Failed to add key string\n");
+            goto err;
+        }
+        LM_DBG("found suitable entry key '%.*s' for query\n",
+                (*keys)->key.len, (*keys)->key.s);
+        *keys_count = 1;
+        pkg_free(keyname.s);
+    } else {
+        LM_ERR("Failed to create direct entry key, no matching key definition\n");
+        goto err;
+    }
+
+    return 0;
+
+err:
+    db_redis_key_free(keys);
+    if (keyname.s)
+        pkg_free(keyname.s);
+    return -1;
+}
+
+static int db_redis_get_keys_for_all_types(km_redis_con_t *con, const str *table_name,
+        redis_key_t **keys, int *keys_count) {
+
+    struct str_hash_entry *table_e;
+    redis_table_t *table;
+    redis_type_t *type;
+    redis_key_t *key;
+
+    *keys = NULL;
+    *keys_count = 0;
+
+    table_e = str_hash_get(&con->tables, table_name->s, table_name->len);
+    if (!table_e) {
+        LM_ERR("query to undefined table '%.*s', define in db_redis keys parameter!",
+                table_name->len, table_name->s);
+        return -1;
+    }
+    table = (redis_table_t*)table_e->u.p;
+
+    for (type = table->types; type; type = type->next) {
+        for (key = type->keys; key; key = key->next) {
+            if (db_redis_key_add_str(keys, &key->key) != 0) {
+                LM_ERR("Failed to add key string\n");
+                goto err;
+            }
+            (*keys_count)++;
+        }
+    }
+
+    return 0;
+
+err:
+    db_redis_key_free(keys);
+    return -1;
+}
+
+static int db_redis_build_type_keys(km_redis_con_t *con, const str *table_name,
+        const db_key_t *_k, const db_val_t *_v, const int _n,
+        redis_key_t **keys, int *keys_count) {
+
+    struct str_hash_entry *table_e;
+    redis_table_t *table;
+    redis_type_t *type;
+    redis_key_t *key;
+
+    *keys = NULL;
+    *keys_count = 0;
+
+    LM_DBG("build type keys\n");
+
+    table_e = str_hash_get(&con->tables, table_name->s, table_name->len);
+    if (!table_e) {
+        LM_ERR("query to undefined table '%.*s', define in db_redis keys parameter!",
+                table_name->len, table_name->s);
+        return -1;
+    }
+    table = (redis_table_t*)table_e->u.p;
+
+    for (type = table->types; type; type = type->next) {
+        str *type_name = &(type->type);
+        int key_found = 0;
+        str keyname = {NULL, 0};
+        key = type->keys;
+
+        if (db_redis_find_query_key(key, table_name, &type->type, _k, _v, _n, &keyname, &key_found) != 0) {
+            goto err;
+        }
+        if (key_found) {
+            db_redis_key_add_str(keys, &keyname);
+            (*keys_count)++;
+            LM_DBG("found key '%.*s' for type '%.*s'\n",
+                    keyname.len, keyname.s,
+                    type_name->len, type_name->s);
+            pkg_free(keyname.s);
+        }
+    }
+
+    return 0;
+
+err:
+    LM_ERR("Failed to get type key\n");
+    db_redis_key_free(keys);
+    return -1;
+}
+
+static int db_redis_build_query_keys(km_redis_con_t *con, const str *table_name,
+        const db_key_t *_k, const db_val_t *_v, const db_op_t *_op, const int _n,
+        redis_key_t **query_keys, int *query_keys_count, int **manual_keys, int *manual_keys_count,
+        int *do_table_scan) {
+
+    struct str_hash_entry *table_e;
+    redis_table_t *table;
+    redis_type_t *type;
+    redis_key_t *key;
+    str keyname;
+    int key_found;
+    redisReply *reply = NULL;
+    str typename = str_init(REDIS_DIRECT_PREFIX);
+
+    *query_keys = NULL;
+    *query_keys_count = 0;
+    *do_table_scan = 1;
+
+    LM_DBG("build query keys\n");
+
+    table_e = str_hash_get(&con->tables, table_name->s, table_name->len);
+    if (!table_e) {
+        LM_ERR("query to undefined table '%.*s', define in db_redis keys parameter!",
+                table_name->len, table_name->s);
+        return -1;
+    }
+    table = (redis_table_t*)table_e->u.p;
+
+    // check if given keys directly match entry key
+    keyname.s = NULL;
+    keyname.len = 0;
+    key = table->entry_keys;
+
+    if (db_redis_find_query_key(key, table_name, &typename, _k, _v, _n, &keyname, &key_found) != 0) {
+        goto err;
+    }
+    if (key_found) {
+        LM_DBG("found suitable entry key '%.*s' for query\n",
+                keyname.len, keyname.s);
+        db_redis_key_add_str(query_keys, &keyname);
+        *query_keys_count = 1;
+        pkg_free(keyname.s);
+        keyname.s = NULL;
+    } else {
+        LM_DBG("no direct entry key found, checking type keys\n");
+        for (type = table->types; type; type = type->next) {
+            key = type->keys;
+            LM_DBG("checking type '%.*s'\n", type->type.len, type->type.s);
+            if (db_redis_find_query_key(key, table_name, &type->type, _k, _v, _n, &keyname, &key_found) != 0) {
+                goto err;
+            }
+            if (key_found) {
+                redis_key_t *query_v = NULL;
+                char *prefix = "SMEMBERS";
+
+                if (db_redis_key_add_string(&query_v, prefix, strlen(prefix)) != 0) {
+                    LM_ERR("Failed to add smembers command to query\n");
+                    goto err;
+                }
+                if (db_redis_key_add_str(&query_v, &keyname) != 0) {
+                    LM_ERR("Failed to add key name to smembers query\n");
+                    goto err;
+                }
+
+                reply = db_redis_command_argv(con, query_v);
+                pkg_free(keyname.s);
+                keyname.s = NULL;
+                db_redis_key_free(&query_v);
+                db_redis_check_reply(con, reply, err);
+                if (reply->type == REDIS_REPLY_ARRAY) {
+                    if (reply->elements == 0) {
+                        LM_DBG("type query returned empty list\n");
+                        *query_keys_count = 0;
+                        *do_table_scan = 0;
+                        db_redis_free_reply(&reply);
+                        break;
+                    } else {
+                        LM_DBG("populating query keys list with result of type query\n");
+                        *query_keys_count = reply->elements;
+                        for (int i = 0; i < reply->elements; ++i) {
+                            redisReply *subreply = reply->element[i];
+                            if (subreply->type == REDIS_REPLY_STRING) {
+                                LM_DBG("adding resulting entry key '%s' from type query\n", subreply->str);
+                                if (db_redis_key_add_string(query_keys, subreply->str, strlen(subreply->str)) != 0) {
+                                    LM_ERR("Failed to add query key\n");
+                                    goto err;
+                                }
+                            } else {
+                                LM_ERR("Unexpected entry key type in type query, expecting a string\n");
+                                goto err;
+                            }
+                        }
+                    }
+                } else {
+                    LM_ERR("Unexpected reply for type query, expecting an array\n");
+                    goto err;
+                }
+
+                db_redis_free_reply(&reply);
+                break;
+            }
+        }
+    }
+
+    if (*query_keys_count > 0) {
+        LM_DBG("building manual keys\n");
+        if (db_redis_build_entry_manual_keys(table, _k, _v, _n, manual_keys, manual_keys_count) != 0) {
+            LM_ERR("Failed to build manual entry key list\n");
+            goto err;
+
+        }
+    }
+
+    return 0;
+err:
+    if (keyname.s) {
+        pkg_free(keyname.s);
+        keyname.s = NULL;
+    }
+    if (reply) {
+        db_redis_free_reply(&reply);
+    }
+    db_redis_key_free(query_keys);
+    if (*manual_keys) {
+        pkg_free(*manual_keys);
+        *manual_keys = NULL;
+    }
+    return -1;
+}
+
+static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name,
+        const db_key_t *_k, const int _n,
+        redis_key_t **query_keys, int *query_keys_count,
+        int **manual_keys, int *manual_keys_count) {
+
+    size_t i = 0;
+    redis_key_t *query_v = NULL;
+    char cursor_str[32] = "";
+    redisReply *reply = NULL;
+    unsigned long cursor = 0;
+    char *match = NULL;
+
+    str match_pattern = {":entry::*", strlen(":entry::*")};
+
+    *query_keys = NULL;
+    *query_keys_count = 0;
+    *manual_keys = NULL;
+    *manual_keys_count = 0;
+
+    do {
+        snprintf(cursor_str, sizeof(cursor_str), "%lu", cursor);
+        match = (char*)pkg_malloc(table_name->len + match_pattern.len + 1);
+        if (!match) {
+            LM_ERR("Failed to allocate memory for match pattern\n");
+            goto err;
+        }
+        snprintf(match, table_name->len + match_pattern.len + 1, "%s%s\n",
+                table_name->s, match_pattern.s);
+
+        if (db_redis_key_add_string(&query_v, "SCAN", 4) != 0) {
+            LM_ERR("Failed to add scan command to scan query\n");
+            goto err;
+        }
+        if (db_redis_key_add_string(&query_v, cursor_str, strlen(cursor_str)) != 0) {
+            LM_ERR("Failed to add cursor to scan query\n");
+            goto err;
+        }
+        if (db_redis_key_add_string(&query_v, "MATCH", 5) != 0) {
+            LM_ERR("Failed to add match command to scan query\n");
+            goto err;
+        }
+        if (db_redis_key_add_string(&query_v, match, strlen(match)) != 0) {
+            LM_ERR("Failed to add match pattern to scan query\n");
+            goto err;
+        }
+        pkg_free(match); match = NULL;
+
+        reply = db_redis_command_argv(con, query_v);
+        db_redis_key_free(&query_v);
+        db_redis_check_reply(con, reply, err);
+        if (reply->type != REDIS_REPLY_ARRAY) {
+            LM_ERR("Invalid reply type for scan on table '%.*s', expected array\n",
+                    table_name->len, table_name->s);
+            goto err;
+        }
+        if (reply->elements != 2) {
+            LM_ERR("Invalid number of reply elements for scan on table '%.*s', expected 2, got %lu\n",
+                    table_name->len, table_name->s, reply->elements);
+            goto err;
+        }
+
+        if (reply->element[0]->type == REDIS_REPLY_STRING) {
+            cursor = atol(reply->element[0]->str);
+        } else if (reply->element[0]->type == REDIS_REPLY_INTEGER) {
+            // should not happen, but play it safe
+            cursor = reply->element[0]->integer;
+        } else {
+            LM_ERR("Invalid cursor type for scan on table '%.*s', expected string or integer\n",
+                    table_name->len, table_name->s);
+            goto err;
+        }
+
+        if (reply->element[1]->type != REDIS_REPLY_ARRAY) {
+            LM_ERR("Invalid content type for scan on table '%.*s', expected array\n",
+                    table_name->len, table_name->s);
+            goto err;
+        }
+        if (reply->element[1]->elements == 0) {
+            LM_DBG("no matching entries found for scan on table '%.*s'\n",
+                    table_name->len, table_name->s);
+            return 0;
+        }
+
+        *query_keys_count += reply->element[1]->elements;
+
+        for (size_t j = 0; j < reply->element[1]->elements; ++i, ++j) {
+            redisReply *key = reply->element[1]->element[j];
+            if (!key) {
+                LM_ERR("Invalid null key at cursor result index %lu while scanning table '%.*s'\n",
+                        j, table_name->len, table_name->s);
+                goto err;
+            }
+            if (key->type != REDIS_REPLY_STRING) {
+                LM_ERR("Invalid key type at cursor result index %lu while scanning table '%.*s', expected string\n",
+                        j, table_name->len, table_name->s);
+                goto err;
+            }
+            if (db_redis_key_add_string(query_keys, key->str, strlen(key->str)) != 0) {
+                LM_ERR("Failed to add redis key\n");
+                goto err;
+            }
+        }
+        db_redis_free_reply(&reply);
+    } while (cursor > 0);
+
+    // for full table scans, we have to manually match all given keys
+    *manual_keys_count = _n;
+    *manual_keys = (int*)pkg_malloc(*manual_keys_count * sizeof(int));
+    if (! *manual_keys) {
+        LM_ERR("Failed to allocate memory for manual keys\n");
+        goto err;
+    }
+    memset(*manual_keys, 0, *manual_keys_count * sizeof(int));
+    for (int i = 0; i < _n; ++i) {
+        (*manual_keys)[i] = i;
+    }
+
+    if (reply) {
+        db_redis_free_reply(&reply);
+    }
+    return 0;
+
+err:
+    if (match)
+        pkg_free(match);
+    if (reply)
+        db_redis_free_reply(&reply);
+    db_redis_key_free(&query_v);
+    db_redis_key_free(query_keys);
+    *query_keys_count = 0;
+    if (*manual_keys) {
+        pkg_free(*manual_keys);
+        *manual_keys = NULL;
+    }
+    return -1;
+}
+
+static db1_res_t* db_redis_new_result(void) {
+    db1_res_t* obj;
+
+    obj = db_new_result();
+    if (!obj)
+        return NULL;
+    return obj;
+}
+
+static int db_redis_compare_column(db_key_t k, db_val_t *v, db_op_t op, redisReply *reply) {
+    int i_value;
+    long long ll_value;
+    double d_value;
+    str *tmpstr;
+    char tmp[32] = "";
+
+    int vtype = VAL_TYPE(v);
+
+    if (VAL_NULL(v) && reply->type == REDIS_REPLY_NIL) {
+        LM_DBG("comparing matching NULL values\n");
+        return 0;
+    } else if (VAL_NULL(v) || reply->type == REDIS_REPLY_NIL) {
+        LM_DBG("comparing non-matching NULL values\n");
+        return -1;
+    }
+
+    switch (vtype) {
+        case DB1_INT:
+            i_value = atoi(reply->str);
+            LM_DBG("comparing INT %d %s %d\n", i_value, op, VAL_INT(v));
+            if (!strcmp(op, OP_EQ)) {
+                if (i_value == VAL_INT(v))
+                    return 0;
+            } else if (!strcmp(op, OP_LT)) {
+                if (i_value < VAL_INT(v))
+                    return 0;
+            } else if (!strcmp(op, OP_GT)) {
+                if (i_value > VAL_INT(v))
+                    return 0;
+            } else if (!strcmp(op, OP_LEQ)) {
+                if (i_value <= VAL_INT(v))
+                    return 0;
+            } else if (!strcmp(op, OP_GEQ)) {
+                if (i_value >= VAL_INT(v))
+                    return 0;
+            } else if (!strcmp(op, OP_NEQ)) {
+                if (i_value != VAL_INT(v))
+                    return 0;
+            } else if (!strcmp(op, OP_BITWISE_AND)) {
+                if (i_value & VAL_INT(v))
+                    return 0;
+            } else {
+                LM_ERR("Unsupported op type '%s'\n", op);
+                return -1;
+            }
+            return -1;
+        case DB1_BIGINT:
+            ll_value = atoll(reply->str);
+            LM_DBG("comparing BIGINT %lld %s %lld\n", ll_value, op, VAL_BIGINT(v));
+            if (!strcmp(op, OP_EQ)) {
+                if (ll_value == VAL_BIGINT(v))
+                    return 0;
+            } else if (!strcmp(op, OP_LT)) {
+                if (ll_value < VAL_BIGINT(v))
+                    return 0;
+            } else if (!strcmp(op, OP_GT)) {
+                if (ll_value > VAL_BIGINT(v))
+                    return 0;
+            } else if (!strcmp(op, OP_LEQ)) {
+                if (ll_value <= VAL_BIGINT(v))
+                    return 0;
+            } else if (!strcmp(op, OP_GEQ)) {
+                if (ll_value >= VAL_BIGINT(v))
+                    return 0;
+            } else if (!strcmp(op, OP_NEQ)) {
+                if (ll_value != VAL_BIGINT(v))
+                    return 0;
+            } else if (!strcmp(op, OP_BITWISE_AND)) {
+                if (ll_value & VAL_BIGINT(v))
+                    return 0;
+            } else {
+                LM_ERR("Unsupported op type '%s'\n", op);
+                return -1;
+            }
+            return -1;
+        case DB1_STRING:
+            LM_DBG("comparing STRING %s %s %s\n", reply->str, op, VAL_STRING(v));
+            if (!strcmp(op, OP_EQ)) {
+                return (strcmp(reply->str, VAL_STRING(v)) == 0) ? 0 : -1;
+            } else if (!strcmp(op, OP_LT)) {
+                return (strcmp(reply->str, VAL_STRING(v)) < 0) ? 0 : -1;
+            } else if (!strcmp(op, OP_GT)) {
+                return (strcmp(reply->str, VAL_STRING(v)) > 0) ? 0 : -1;
+            } else if (!strcmp(op, OP_LEQ)) {
+                return (strcmp(reply->str, VAL_STRING(v)) <= 0) ? 0 : -1;
+            } else if (!strcmp(op, OP_GEQ)) {
+                return (strcmp(reply->str, VAL_STRING(v)) >= 0) ? 0 : -1;
+            } else if (!strcmp(op, OP_NEQ)) {
+                return (strcmp(reply->str, VAL_STRING(v)) != 0) ? 0 : -1;
+            } else {
+                LM_ERR("Unsupported op type '%s'\n", op);
+                return -1;
+            }
+            return -1;
+        case DB1_STR:
+            tmpstr = (struct _str*) &(VAL_STR(v));
+            LM_DBG("comparing STR %s %s %.*s\n", reply->str, op, tmpstr->len, tmpstr->s);
+            if (!strcmp(op, OP_EQ)) {
+                return (strncmp(reply->str, tmpstr->s, tmpstr->len) == 0) ? 0 : -1;
+            } else if (!strcmp(op, OP_LT)) {
+                return (strncmp(reply->str, tmpstr->s, tmpstr->len) < 0) ? 0 : -1;
+            } else if (!strcmp(op, OP_GT)) {
+                return (strncmp(reply->str, tmpstr->s, tmpstr->len) > 0) ? 0 : -1;
+            } else if (!strcmp(op, OP_LEQ)) {
+                return (strncmp(reply->str, tmpstr->s, tmpstr->len) <= 0) ? 0 : -1;
+            } else if (!strcmp(op, OP_GEQ)) {
+                return (strncmp(reply->str, tmpstr->s, tmpstr->len) >= 0) ? 0 : -1;
+            } else if (!strcmp(op, OP_NEQ)) {
+                return (strncmp(reply->str, tmpstr->s, tmpstr->len) != 0) ? 0 : -1;
+            } else {
+                LM_ERR("Unsupported op type '%s'\n", op);
+                return -1;
+            }
+            return -1;
+        case DB1_DOUBLE:
+            d_value = atof(reply->str);
+            LM_DBG("comparing DOUBLE %f %s %f\n", d_value, op, VAL_DOUBLE(v));
+            if (!strcmp(op, OP_EQ)) {
+                return (d_value == VAL_DOUBLE(v)) ? 0 : -1;
+            } else if (!strcmp(op, OP_LT)) {
+                return (d_value < VAL_DOUBLE(v)) ? 0 : -1;
+            } else if (!strcmp(op, OP_GT)) {
+                return (d_value > VAL_DOUBLE(v)) ? 0 : -1;
+            } else if (!strcmp(op, OP_LEQ)) {
+                return (d_value <= VAL_DOUBLE(v)) ? 0 : -1;
+            } else if (!strcmp(op, OP_GEQ)) {
+                return (d_value >= VAL_DOUBLE(v)) ? 0 : -1;
+            } else if (!strcmp(op, OP_NEQ)) {
+                return (d_value != VAL_DOUBLE(v)) ? 0 : -1;
+            } else {
+                LM_ERR("Unsupported op type '%s'\n", op);
+                return -1;
+            }
+            return -1;
+        case DB1_DATETIME:
+            // TODO: insert int value to db for faster comparison!
+            strftime(tmp, sizeof(tmp), "%Y-%m-%d %H:%M:%S", localtime(&(VAL_TIME(v))));
+            LM_DBG("comparing DATETIME %s %s %s\n", reply->str, op, tmp);
+            if (!strcmp(op, OP_EQ)) {
+                return (strcmp(reply->str, tmp) == 0) ? 0 : -1;
+            } else if (!strcmp(op, OP_LT)) {
+                return (strcmp(reply->str, tmp) < 0) ? 0 : -1;
+            } else if (!strcmp(op, OP_GT)) {
+                return (strcmp(reply->str, tmp) > 0) ? 0 : -1;
+            } else if (!strcmp(op, OP_LEQ)) {
+                return (strcmp(reply->str, tmp) <= 0) ? 0 : -1;
+            } else if (!strcmp(op, OP_GEQ)) {
+                return (strcmp(reply->str, tmp) >= 0) ? 0 : -1;
+            } else if (!strcmp(op, OP_NEQ)) {
+                return (strcmp(reply->str, tmp) != 0) ? 0 : -1;
+            } else {
+                LM_ERR("Unsupported op type '%s'\n", op);
+                return -1;
+            }
+            return -1;
+        case DB1_BITMAP:
+            i_value = atoi(reply->str);
+            LM_DBG("comparing BITMAP %d %s %d\n", i_value, op, VAL_BITMAP(v));
+            if (!strcmp(op, OP_EQ)) {
+                if (i_value == VAL_BITMAP(v))
+                    return 0;
+            } else if (!strcmp(op, OP_LT)) {
+                if (i_value < VAL_BITMAP(v))
+                    return 0;
+            } else if (!strcmp(op, OP_GT)) {
+                if (i_value > VAL_BITMAP(v))
+                    return 0;
+            } else if (!strcmp(op, OP_LEQ)) {
+                if (i_value <= VAL_BITMAP(v))
+                    return 0;
+            } else if (!strcmp(op, OP_GEQ)) {
+                if (i_value >= VAL_BITMAP(v))
+                    return 0;
+            } else if (!strcmp(op, OP_NEQ)) {
+                if (i_value != VAL_BITMAP(v))
+                    return 0;
+            } else if (!strcmp(op, OP_BITWISE_AND)) {
+                if (i_value & VAL_BITMAP(v))
+                    return 0;
+            } else {
+                LM_ERR("Unsupported op type '%s'\n", op);
+                return -1;
+            }
+            return -1;
+        case DB1_BLOB:
+        default:
+            LM_ERR("Unsupported val type %d\n", vtype);
+            return -1;
+    }
+}
+
+static int db_redis_convert_row(km_redis_con_t *con, db1_res_t* _r, const db_key_t *_k,
+        const db_val_t *_v, const db_op_t *_op,
+        redisReply *reply, const str *table_name, const db_key_t* _c, int _nc,
+        int *manual_keys, int manual_keys_count) {
+    db_val_t* dval;
+    db_row_t* drow;
+
+    if (reply->type != REDIS_REPLY_ARRAY) {
+        LM_ERR("Unexpected redis reply type, expecting array\n");
+        return -1;
+    }
+
+    if (!reply->elements) {
+        LM_DBG("skip empty row");
+        return 0;
+    }
+
+    // manually filter non-matching replies
+    for (size_t col = 0; col < reply->elements; ++col) {
+        if (col < manual_keys_count) {
+            int idx = manual_keys[col];
+            db_key_t k = _k[idx];
+            db_val_t v = _v[idx];
+            db_op_t o = _op[idx];
+            LM_DBG("manually filtering key '%.*s'\n",
+                    k->len, k->s);
+            if (db_redis_compare_column(k, &v, o, reply->element[col]) != 0) {
+                LM_DBG("column %lu does not match, ignore row\n", col);
+                return 0;
+            }
+        }
+    }
+
+    RES_NUM_ROWS(_r) = RES_ROW_N(_r) = RES_NUM_ROWS(_r) + 1;
+    drow = &(RES_ROWS(_r)[RES_NUM_ROWS(_r)-1]);
+
+    if (db_allocate_row(_r, drow) != 0) {
+        LM_ERR("Failed to allocate row %d\n", RES_NUM_ROWS(_r));
+        return -1;
+    }
+
+    if (reply->elements - manual_keys_count > RES_COL_N(_r)) {
+        LM_ERR("Invalid number of columns at row %d/%d, expecting %d, got %lu\n",
+                RES_NUM_ROWS(_r), RES_ROW_N(_r), RES_COL_N(_r), reply->elements - manual_keys_count);
+        return -1;
+    }
+    for (size_t col = manual_keys_count; col < reply->elements; ++col) {
+        size_t colidx = col - manual_keys_count;
+        size_t redisidx = col;
+        int coltype;
+        redisReply *col_val = reply->element[redisidx];
+        str *col_name = _c[colidx];
+
+        LM_DBG("converting column #%lu of row #%d", colidx, RES_ROW_N(_r));
+
+        if (col_val->type != REDIS_REPLY_STRING &&
+            col_val->type != REDIS_REPLY_NIL) {
+
+            LM_ERR("Invalid column value type in column '%.*s' of row %d, expecting string or null\n",
+                    col_name->len, col_name->s, RES_NUM_ROWS(_r));
+            return -1;
+        }
+
+        if (RES_NUM_ROWS(_r) == 1) {
+            coltype = db_redis_schema_get_column_type(con, table_name, col_name);
+            RES_TYPES(_r)[colidx] = coltype;
+        } else {
+            coltype = RES_TYPES(_r)[colidx];
+        }
+
+        dval = &(ROW_VALUES(drow)[colidx]);
+        VAL_TYPE(dval) = coltype;
+
+        if (col_val->type == REDIS_REPLY_NIL) {
+            VAL_NULL(dval) = 1;
+        } else {
+            if (db_str2val(coltype, dval, col_val->str, strlen(col_val->str), 1) != 0) {
+                LM_ERR("Failed to convert redis column '%.*s' to db value\n",
+                        col_name->len, col_name->s);
+                return -1;
+            }
+        }
+    }
+
+    return 0;
+}
+
+static int db_redis_perform_query(const db1_con_t* _h, km_redis_con_t *con, const db_key_t* _k,
+        const db_val_t* _v, const db_op_t *_op, const db_key_t* _c,
+        const int _n, const int _nc, db1_res_t** _r,
+        redis_key_t **keys, int *keys_count,
+        int **manual_keys, int *manual_keys_count, int do_table_scan) {
+
+    redisReply *reply = NULL;
+    redis_key_t *query_v = NULL;
+    int num_rows = 0;
+
+    *_r = db_redis_new_result();
+    if (!*_r) {
+        LM_ERR("Failed to allocate memory for result");
+        goto error;
+    }
+
+    if (db_allocate_columns(*_r, _nc) != 0) {
+        LM_ERR("Failed to allocate memory for result columns");
+        goto error;
+    }
+    RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = 0;
+    RES_COL_N(*_r) = _nc;
+
+    if (!keys_count && do_table_scan) {
+        LM_DBG("performing full table scan\n");
+        if (db_redis_scan_query_keys(con, CON_TABLE(_h), _k, _n,
+                    keys, keys_count,
+                    manual_keys, manual_keys_count) != 0) {
+            LM_ERR("failed to scan query keys\n");
+            goto error;
+        }
+    }
+
+
+
+    LM_DBG("++++++ going over query keys\n");
+    for (redis_key_t *key = *keys; key; key = key->next) {
+        redis_key_t *tmp = NULL;
+        str *keyname = &(key->key);
+
+        num_rows++;
+
+        LM_DBG("checking key '%s' in redis\n", keyname->s);
+
+        if (db_redis_key_add_string(&query_v, "EXISTS", 6) != 0) {
+            LM_ERR("Failed to add exists query to list\n");
+            goto error;
+        }
+        if (db_redis_key_add_str(&query_v, keyname) != 0) {
+            LM_ERR("Failed to add key name to list\n");
+            goto error;
+        }
+        if (db_redis_append_command_argv(con, query_v) != REDIS_OK) {
+            LM_ERR("Failed to append redis command\n");
+            goto error;
+        }
+        tmp = db_redis_key_unshift(&query_v);
+        if (tmp)
+            db_redis_key_free(&tmp);
+
+        // construct HMGET query
+        if (_nc + (*manual_keys_count) == 0) {
+            if (db_redis_key_prepend_string(&query_v, "HGETALL", 7) != 0) {
+                LM_ERR("Failed to add hgetall query to list\n");
+                goto error;
+            }
+        } else {
+            if (db_redis_key_prepend_string(&query_v, "HMGET", 5) != 0) {
+                LM_ERR("Failed to add hmget query to list\n");
+                goto error;
+            }
+        }
+
+        // we put the manual comparison columns first, so we can skip them
+        // easily in result, for the cost of potential duplicate column returns
+        for (int j = 0; j < *manual_keys_count; ++j) {
+            int idx = (*manual_keys)[j];
+            str *k_name = _k[idx];
+            if (db_redis_key_add_str(&query_v, k_name) != 0) {
+                LM_ERR("Failed to add manual key to query list\n");
+                goto error;
+            }
+        }
+        for (int j = 0; j < _nc; ++j) {
+            str *k_name = _c[j];
+            if (db_redis_key_add_str(&query_v, k_name) != 0) {
+                LM_ERR("Failed to add manual key to query list\n");
+                goto error;
+            }
+        }
+
+        if (db_redis_append_command_argv(con, query_v) != REDIS_OK) {
+            LM_ERR("Failed to append redis command\n");
+            goto error;
+        }
+
+        db_redis_key_free(&query_v);
+        query_v = NULL;
+    }
+
+    // we allocate best case scenario (all rows match)
+    RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = num_rows;
+    if (db_allocate_rows(*_r) != 0) {
+        LM_ERR("Failed to allocate memory for rows\n");
+        return -1;
+    }
+    RES_COL_N(*_r) = _nc;
+    // reset and increment in convert_row
+    RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = 0;
+
+    for (redis_key_t *key = *keys; key; key = key->next) {
+        // get reply for EXISTS query
+        if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
+            LM_ERR("Failed to get reply for query: %s\n",
+                    con->con->errstr);
+            goto error;
+        }
+        db_redis_check_reply(con, reply, error);
+        if (reply->integer == 0) {
+            LM_DBG("key does not exist, returning no row for query\n");
+            db_redis_free_reply(&reply);
+            // also free next reply, as this is a null row for the HMGET
+            db_redis_get_reply(con, (void**)&reply);
+            db_redis_check_reply(con, reply, error);
+            db_redis_free_reply(&reply);
+            continue;
+        }
+        db_redis_free_reply(&reply);
+
+        // get reply for actual HMGET query
+        if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
+            LM_ERR("Failed to get reply for query: %s\n",
+                    con->con->errstr);
+            goto error;
+        }
+        db_redis_check_reply(con, reply, error);
+        if (reply->type != REDIS_REPLY_ARRAY) {
+            LM_ERR("Unexpected reply, expected array\n");
+            goto error;
+        }
+        LM_DBG("dumping full query reply for row\n");
+        db_redis_dump_reply(reply);
+
+        if (db_redis_convert_row(con, *_r, _k, _v, _op, reply, CON_TABLE(_h), _c, _nc, *manual_keys, *manual_keys_count)) {
+            LM_ERR("Failed to convert redis reply for row\n");
+            goto error;
+        }
+        db_redis_free_reply(&reply);
+    }
+
+    return 0;
+
+error:
+    LM_ERR("failed to perform the query\n");
+    db_redis_key_free(&query_v);
+    if(reply)
+        db_redis_free_reply(&reply);
+    if(_r && *_r) {
+        db_redis_free_result((db1_con_t*)_h, *_r); *_r = NULL;
+    }
+    return -1;
+}
+
+static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, const db_key_t* _k,
+        const db_val_t* _v, const db_op_t *_op, const int _n,
+        redis_key_t *keys, int keys_count,
+        int *manual_keys, int manual_keys_count, int do_table_scan) {
+
+    int j = 0;
+    redis_key_t *k = NULL;
+    int type_keys_count = 0;
+    int all_type_keys_count = 0;
+
+    redisReply *reply = NULL;
+    redis_key_t *query_v = NULL;
+    redis_key_t *type_keys = NULL;
+    redis_key_t *all_type_keys = NULL;
+    db_val_t *db_vals = NULL;
+    db_key_t *db_keys = NULL;
+
+    if (!keys_count && do_table_scan) {
+        LM_DBG("performing full table scan\n");
+        if (db_redis_scan_query_keys(con, CON_TABLE(_h), _k, _n,
+                    &keys, &keys_count,
+                    &manual_keys, &manual_keys_count) != 0) {
+            LM_ERR("failed to scan query keys\n");
+            goto error;
+        }
+    }
+
+    // TODO: this should be moved to redis_connection structure
+    // and be parsed at startup:
+    //
+    // fetch list of keys in all types
+    if (db_redis_get_keys_for_all_types(con, CON_TABLE(_h),
+                &all_type_keys, &all_type_keys_count) != 0) {
+            LM_ERR("failed to get full list of type keys\n");
+            goto error;
+    }
+
+    LM_DBG("+++ delete all keys\n");
+    for (k = keys; k; k = k->next) {
+        redis_key_t *all_type_key;
+        str *key = &k->key;
+        redis_key_t *tmp = NULL;
+        int row_match;
+        LM_DBG("+++ delete key '%.*s'\n", key->len, key->s);
+
+        if (db_redis_key_add_string(&query_v, "EXISTS", 6) != 0) {
+            LM_ERR("Failed to add exists command to pre-delete query\n");
+            goto error;
+        }
+        if (db_redis_key_add_str(&query_v, key) != 0) {
+            LM_ERR("Failed to add key name to pre-delete query\n");
+            goto error;
+        }
+
+        // TODO: pipeline commands!
+        reply = db_redis_command_argv(con, query_v);
+        db_redis_check_reply(con, reply, error);
+        if (reply->integer == 0) {
+            LM_DBG("key does not exist in redis, skip deleting\n");
+            db_redis_free_reply(&reply);
+            continue;
+        }
+        db_redis_free_reply(&reply);
+        tmp = db_redis_key_unshift(&query_v);
+        if (tmp)
+            db_redis_key_free(&tmp);
+
+        if (db_redis_key_prepend_string(&query_v, "HMGET", 5) != 0) {
+            LM_ERR("Failed to set hmget command to pre-delete query\n");
+            goto error;
+        }
+
+        // add all manual keys to query
+        for (j = 0; j < manual_keys_count; ++j) {
+            int idx = manual_keys[j];
+            str *col = _k[idx];
+
+            if (db_redis_key_add_str(&query_v, col) != 0) {
+                LM_ERR("Failed to add manual key to pre-delete query\n");
+                goto error;
+            }
+        }
+        // add all type keys to query
+        for (all_type_key = all_type_keys; all_type_key; all_type_key = all_type_key->next) {
+            if (db_redis_key_add_str(&query_v, &all_type_key->key) != 0) {
+                LM_ERR("Failed to add type key to pre-delete query\n");
+                goto error;
+            }
+        }
+        reply = db_redis_command_argv(con, query_v);
+        db_redis_key_free(&query_v);
+        db_redis_check_reply(con, reply, error);
+
+        LM_DBG("dumping full query reply\n");
+        db_redis_dump_reply(reply);
+
+        // manually filter non-matching replies
+        row_match = 1;
+        for (size_t col = 0; col < reply->elements; ++col) {
+            if (col < manual_keys_count) {
+                int idx = manual_keys[col];
+                db_key_t k = _k[idx];
+                db_val_t v = _v[idx];
+                db_op_t o = _op[idx];
+                LM_DBG("manually filtering key '%.*s'\n",
+                        k->len, k->s);
+                if (db_redis_compare_column(k, &v, o, reply->element[col]) != 0) {
+                    LM_DBG("column %lu does not match, ignore row\n", col);
+                    row_match = 0;
+                    break;
+                }
+            }
+        }
+        if (!row_match) {
+            db_redis_free_reply(&reply);
+            continue;
+        } else {
+            LM_DBG("row matches manual filtering, proceed with deletion\n");
+        }
+
+        db_keys = (db_key_t*) pkg_malloc(all_type_keys_count * sizeof(db_key_t));
+        if (!db_keys) {
+            LM_ERR("Failed to allocate memory for db type keys\n");
+            goto error;
+        }
+        for (j = 0, tmp = all_type_keys; tmp; ++j, tmp = tmp->next) {
+            db_keys[j] = &tmp->key;
+        }
+
+        db_vals = (db_val_t*) pkg_malloc(all_type_keys_count * sizeof(db_val_t));
+        if (!db_vals) {
+            LM_ERR("Failed to allocate memory for manual db vals\n");
+            goto error;
+        }
+
+        for (j = 0, all_type_key = all_type_keys; all_type_key; ++j, all_type_key = all_type_key->next) {
+            db_val_t *v = &(db_vals[j]);
+            str *key = &all_type_key->key;
+            char *value = reply->element[manual_keys_count + j]->str;
+            int coltype = db_redis_schema_get_column_type(con, CON_TABLE(_h), key);
+            if (value == NULL) {
+                VAL_NULL(v) = 1;
+            } else if (db_str2val(coltype, v, value, strlen(value), 0) != 0) {
+                LM_ERR("Failed to convert redis reply column to db value\n");
+                goto error;
+            }
+        }
+        if (db_redis_build_type_keys(con, CON_TABLE(_h), db_keys, db_vals, all_type_keys_count,
+                    &type_keys, &type_keys_count) != 0) {
+            LM_ERR("failed to build type keys\n");
+            goto error;
+        }
+        pkg_free(db_keys);
+        pkg_free(db_vals);
+        db_redis_free_reply(&reply);
+
+        if (db_redis_key_add_string(&query_v, "DEL", 3) != 0) {
+            LM_ERR("Failed to add del command to delete query\n");
+            goto error;
+        }
+        if (db_redis_key_add_str(&query_v, key) != 0) {
+            LM_ERR("Failed to add key to delete query\n");
+            goto error;
+        }
+
+        reply = db_redis_command_argv(con, query_v);
+        db_redis_key_free(&query_v);
+        db_redis_check_reply(con, reply, error);
+        db_redis_free_reply(&reply);
+
+        for (redis_key_t *type_key = type_keys; type_key; type_key = type_key->next) {
+            if (db_redis_key_add_string(&query_v, "SREM", 4) != 0) {
+                LM_ERR("Failed to add srem command to post-delete query\n");
+                goto error;
+            }
+            if (db_redis_key_add_str(&query_v, &type_key->key) != 0) {
+                LM_ERR("Failed to add key to delete query\n");
+                goto error;
+            }
+            if (db_redis_key_add_str(&query_v, key) != 0) {
+                LM_ERR("Failed to add key to delete query\n");
+                goto error;
+            }
+            reply = db_redis_command_argv(con, query_v);
+            db_redis_key_free(&query_v);
+            db_redis_check_reply(con, reply, error);
+            db_redis_free_reply(&reply);
+        }
+
+        //db_redis_key_free(&type_keys);
+        LM_DBG("+++ done with loop '%.*s'\n", k->key.len, k->key.s);
+    }
+    pkg_free(query_v);
+    db_redis_key_free(&type_keys);
+    db_redis_key_free(&all_type_keys);
+
+    return 0;
+
+error:
+    LM_ERR("failed to perform the delete\n");
+    if(reply)
+        db_redis_free_reply(&reply);
+    if (db_keys)
+        pkg_free(db_keys);
+    if (db_vals)
+        pkg_free(db_vals);
+    db_redis_key_free(&query_v);
+    db_redis_key_free(&type_keys);
+    db_redis_key_free(&all_type_keys);
+    return -1;
+}
+
+static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, const db_key_t* _k,
+        const db_val_t* _v, const db_op_t *_op, const db_key_t* _uk, const db_val_t *_uv,
+        const int _n, const int _nu,
+        redis_key_t **keys, int *keys_count,
+        int **manual_keys, int *manual_keys_count, int do_table_scan) {
+
+    redisReply *reply = NULL;
+    redis_key_t *query_v = NULL;
+    int update_queries = 0;
+
+    if (!keys_count && do_table_scan) {
+        LM_DBG("performing full table scan\n");
+        if (db_redis_scan_query_keys(con, CON_TABLE(_h), _k, _n,
+                    keys, keys_count,
+                    manual_keys, manual_keys_count) != 0) {
+            LM_ERR("failed to scan query keys\n");
+            goto error;
+        }
+    }
+
+    for (redis_key_t *key = *keys; key; key = key->next) {
+        str *keyname = &key->key;
+
+        LM_DBG("fetching row for '%.*s' from redis\n", keyname->len, keyname->s);
+
+
+        if (db_redis_key_add_string(&query_v, "EXISTS", 6) != 0) {
+            LM_ERR("Failed to set exists command to pre-update exists query\n");
+            goto error;
+        }
+        if (db_redis_key_add_str(&query_v, keyname) != 0) {
+            LM_ERR("Failed to add key name to pre-update exists query\n");
+            goto error;
+        }
+        if (db_redis_append_command_argv(con, query_v) != REDIS_OK) {
+            LM_ERR("Failed to append redis command\n");
+            goto error;
+        }
+        db_redis_key_free(&query_v);
+
+        // construct HMGET query
+        if ((*manual_keys_count) == 0) {
+            if (db_redis_key_add_string(&query_v, "HGETALL", 7) != 0) {
+                LM_ERR("Failed to set hgetall command to pre-update hget query\n");
+                goto error;
+            }
+            // TODO: actually we wouldn't have to fetch it at all, but then we'd
+            // have to mark this key telling to not fetch reply of HMGET after
+            // EXISTS returns false!
+        } else {
+            if (db_redis_key_add_string(&query_v, "HMGET", 5) != 0) {
+                LM_ERR("Failed to set hgetall command to pre-update hget query\n");
+                goto error;
+            }
+        }
+        if (db_redis_key_add_str(&query_v, keyname) != 0) {
+            LM_ERR("Failed to add key name to pre-update exists query\n");
+            goto error;
+        }
+
+        for (int j = 0; j < *manual_keys_count; ++j) {
+            int idx = (*manual_keys)[j];
+            str *k_name = _k[idx];
+            if (db_redis_key_add_str(&query_v, k_name) != 0) {
+                LM_ERR("Failed to add manual key name to hget query\n");
+                goto error;
+            }
+        }
+
+        if (db_redis_append_command_argv(con, query_v) != REDIS_OK) {
+            LM_ERR("Failed to append redis command\n");
+            goto error;
+        }
+        db_redis_key_free(&query_v);
+    }
+
+/*
+    key_value = (str*)pkg_malloc(_nu * sizeof(str));
+    if (!key_value) {
+        LM_ERR("Failed to allocate memory for key buffer\n");
+        goto error;
+    }
+    memset(key_value, 0, _nu * sizeof(str));
+
+    col_value = (str*)pkg_malloc(_nu * sizeof(str));
+    if (!col_value) {
+        LM_ERR("Failed to allocate memory for column buffer\n");
+        goto error;
+    }
+    memset(col_value, 0, _nu * sizeof(str));
+    */
+
+
+    for (redis_key_t *key = *keys; key; key = key->next) {
+        int row_match;
+
+        LM_DBG("fetching replies for '%.*s' from redis\n", key->key.len, key->key.s);
+
+        // get reply for EXISTS query
+        if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
+            LM_ERR("Failed to get reply for query: %s\n",
+                    con->con->errstr);
+            goto error;
+        }
+        db_redis_check_reply(con, reply, error);
+        if (reply->integer == 0) {
+            LM_DBG("key does not exist, returning no row for query\n");
+            db_redis_free_reply(&reply);
+            // also free next reply, as this is a null row for the HMGET
+            LM_DBG("also fetch hmget reply after non-existent key\n");
+            db_redis_get_reply(con, (void**)&reply);
+            db_redis_check_reply(con, reply, error);
+            db_redis_free_reply(&reply);
+            LM_DBG("continue fetch reply loop\n");
+            continue;
+        }
+        db_redis_free_reply(&reply);
+
+        // get reply for actual HMGET query
+        if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
+            LM_ERR("Failed to get reply for query: %s\n",
+                    con->con->errstr);
+            goto error;
+        }
+        db_redis_check_reply(con, reply, error);
+        if (reply->type != REDIS_REPLY_ARRAY) {
+            LM_ERR("Unexpected reply, expected array\n");
+            goto error;
+        }
+        LM_DBG("dumping full query reply for row\n");
+        db_redis_dump_reply(reply);
+
+        // manually filter non-matching replies
+        row_match = 1;
+        for (size_t col = 0; col < reply->elements; ++col) {
+            if (col < *manual_keys_count) {
+                int idx = (*manual_keys)[col];
+                db_key_t k = _k[idx];
+                db_val_t v = _v[idx];
+                db_op_t o = _op[idx];
+                LM_DBG("manually filtering key '%.*s'\n",
+                        k->len, k->s);
+                if (db_redis_compare_column(k, &v, o, reply->element[col]) != 0) {
+                    LM_DBG("column %lu does not match, ignore row\n", col);
+                    row_match = 0;
+                    break;
+                }
+            }
+        }
+        db_redis_free_reply(&reply);
+        if (!row_match) {
+            continue;
+        } else {
+            LM_DBG("row matches manual filtering, proceed with update\n");
+        }
+
+        if (db_redis_key_add_string(&query_v, "HMSET", 5) != 0) {
+            LM_ERR("Failed to add hmset command to update query\n");
+            goto error;
+        }
+        if (db_redis_key_add_str(&query_v, &key->key) != 0) {
+            LM_ERR("Failed to add key to update query\n");
+            goto error;
+        }
+
+        for (int i = 0; i < _nu; ++i) {
+            str *k = _uk[i];
+            str v = {NULL, 0};
+
+            if (db_redis_val2str(&(_uv[i]), &v) != 0) {
+                LM_ERR("Failed to convert update value to string\n");
+                goto error;
+            }
+            if (db_redis_key_add_str(&query_v, k) != 0) {
+                LM_ERR("Failed to add key to update query\n");
+                goto error;
+            }
+            if (db_redis_key_add_str(&query_v, &v) != 0) {
+                LM_ERR("Failed to add key to update query\n");
+                goto error;
+            }
+            pkg_free(v.s);
+        }
+        update_queries++;
+        if (db_redis_append_command_argv(con, query_v) != REDIS_OK) {
+            LM_ERR("Failed to append redis command\n");
+            goto error;
+        }
+
+        db_redis_key_free(&query_v);
+    }
+
+    LM_DBG("getting replies for %d queries\n", update_queries);
+
+    for (int i = 0; i < update_queries; ++i) {
+        if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
+            LM_ERR("Failed to get reply for query: %s\n",
+                    con->con->errstr);
+            goto error;
+        }
+        db_redis_check_reply(con, reply, error);
+        db_redis_free_reply(&reply);
+    }
+
+    LM_DBG("done performing update\n");
+
+    return 0;
+
+error:
+    LM_ERR("failed to perform the update\n");
+    if (reply)
+        db_redis_free_reply(&reply);
+    db_redis_key_free(&query_v);
+    return -1;
+}
+
+
+/*
+ * Query table for specified rows
+ * _h: structure representing database connection
+ * _k: key names
+ * _op: operators
+ * _v: values of the keys that must match
+ * _c: column names to return
+ * _n: number of key=values pairs to compare
+ * _nc: number of columns to return
+ * _o: order by the specified column
+ */
+int db_redis_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
+        const db_val_t* _v, const db_key_t* _c, const int _n, const int _nc,
+        const db_key_t _o, db1_res_t** _r)
+{
+    km_redis_con_t *con = NULL;
+    int free_op = 0;
+    int do_table_scan = 0;
+
+    redis_key_t *keys = NULL;
+    int keys_count = 0;
+    int *manual_keys = NULL;
+    int manual_keys_count = 0;
+    db_op_t *query_ops = NULL;
+
+    // TODO: implement order-by
+    // TODO: optimize mapping-based manual post-check (remove check for keys already
+    // in type query key)
+
+    con = REDIS_CON(_h);
+    if (con && con->con == NULL) {
+        if (db_redis_connect(con) != 0) {
+            LM_ERR("Failed to reconnect to server\n");
+            return -1;
+        }
+    }
+    if(con == NULL || con->id == NULL || con->con == NULL) {
+        LM_ERR("connection to server is null\n");
+        return -1;
+    }
+    if(CON_TABLE(_h)->s == NULL) {
+        LM_ERR("prefix (table) name not set\n");
+        return -1;
+    } else {
+        LM_DBG("querying prefix (table) '%.*s'\n",
+                CON_TABLE(_h)->len, CON_TABLE(_h)->s);
+    }
+
+    if(_r) *_r = NULL;
+    free_op = 0;
+
+    if (_op == NULL) {
+        char *op = "=";
+        free_op = 1;
+        query_ops = (db_op_t*)pkg_malloc(_n * sizeof(db_op_t));
+        if (!query_ops) {
+            LM_ERR("Failed to allocate memory for query op list\n");
+            goto error;
+        }
+        for (int i = 0; i < _n; ++i) {
+            query_ops[i] = op;
+        }
+    } else {
+        query_ops = (db_op_t*)_op;
+    }
+
+    if (_n > 0) {
+        if (db_redis_build_query_keys(con, CON_TABLE(_h), _k, _v, query_ops, _n,
+                    &keys, &keys_count, &manual_keys, &manual_keys_count, &do_table_scan) != 0) {
+            LM_ERR("failed to build query keys\n");
+            goto error;
+        }
+        if (!keys_count) {
+            if (do_table_scan) {
+                LM_DBG("unable to build query keys, falling back to full table scan\n");
+            } else {
+                LM_DBG("query keys not member of suitable mapping, skip full table scan\n");
+            }
+        }
+    } else {
+        LM_DBG("no columns given to build query keys, falling back to full table scan\n");
+        keys_count = 0;
+    }
+
+    if (db_redis_perform_query(_h, con, _k, _v, query_ops, _c, _n, _nc, _r,
+        &keys, &keys_count, &manual_keys, &manual_keys_count, do_table_scan) != 0) {
+        goto error;
+    }
+
+    LM_DBG("done performing query\n");
+
+    if (free_op && query_ops) {
+        pkg_free(query_ops);
+    }
+    db_redis_key_free(&keys);
+
+    if (manual_keys) {
+        pkg_free(manual_keys);
+    }
+
+    db_redis_consume_replies(con);
+    return 0;
+
+error:
+    LM_ERR("failed to do the query\n");
+    if (free_op && query_ops) {
+        pkg_free(query_ops);
+    }
+    db_redis_key_free(&keys);
+    if (manual_keys) {
+        pkg_free(manual_keys);
+    }
+    db_redis_consume_replies(con);
+
+
+    return -1;
+}
+
+/*
+ * Execute a raw SQL query
+ */
+int db_redis_raw_query(const db1_con_t* _h, const str* _s, db1_res_t** _r)
+{
+    LM_DBG("perform redis raw query\n");
+    return -1;
+}
+
+/*
+ * Insert a row into specified table
+ * _h: structure representing database connection
+ * _k: key names
+ * _v: values of the keys
+ * _n: number of key=value pairs
+ */
+int db_redis_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v, const int _n)
+{
+    km_redis_con_t *con = NULL;
+    redis_key_t *key = NULL;
+    int keys_count = 0;
+    redis_key_t *type_keys = NULL;
+    int type_keys_count = 0;
+    redis_key_t *query_v = NULL;
+    redisReply *reply = NULL;
+
+    con = REDIS_CON(_h);
+    if (con && con->con == NULL) {
+        if (db_redis_connect(con) != 0) {
+            LM_ERR("Failed to reconnect to server\n");
+            return -1;
+        }
+    }
+    if(con == NULL || con->id == NULL || con->con == NULL) {
+        LM_ERR("connection to server is null\n");
+        return -1;
+    }
+    if(CON_TABLE(_h)->s == NULL) {
+        LM_ERR("prefix (table) name not set\n");
+        return -1;
+    } else {
+        LM_DBG("inserting to prefix (table) '%.*s'\n",
+                CON_TABLE(_h)->len, CON_TABLE(_h)->s);
+    }
+
+    if (db_redis_build_entry_keys(con, CON_TABLE(_h), _k, _v, _n,
+                &key, &keys_count) != 0) {
+        LM_ERR("failed to build entry keys\n");
+        goto error;
+    }
+    if (db_redis_build_type_keys(con, CON_TABLE(_h), _k, _v, _n,
+                &type_keys, &type_keys_count) != 0) {
+        LM_ERR("failed to build type keys\n");
+        goto error;
+    }
+
+    if (db_redis_key_add_string(&query_v, "HMSET", 5) != 0) {
+        LM_ERR("Failed to add hmset command to insert query\n");
+        goto error;
+    }
+    if (db_redis_key_add_str(&query_v, &key->key) != 0) {
+        LM_ERR("Failed to add key to insert query\n");
+        goto error;
+    }
+
+    for (int i = 0; i < _n; ++i) {
+        str *k = _k[i];
+        str v;
+
+        if (db_redis_key_add_str(&query_v, k) != 0) {
+            LM_ERR("Failed to add column name to insert query\n");
+            goto error;
+        }
+        if (db_redis_val2str(&(_v[i]), &v) != 0) {
+            LM_ERR("Failed to allocate memory for query value\n");
+            goto error;
+        }
+        if (db_redis_key_add_str(&query_v, &v) != 0) {
+            LM_ERR("Failed to add column value to insert query\n");
+            goto error;
+        }
+        pkg_free(v.s);
+    }
+
+    reply = db_redis_command_argv(con, query_v);
+    db_redis_key_free(&query_v);
+    db_redis_check_reply(con, reply, error);
+    db_redis_free_reply(&reply);
+
+    for (redis_key_t *k = type_keys; k; k = k->next) {
+        str *type_key = &k->key;
+
+        LM_DBG("inserting entry key '%.*s' to type map '%.*s'\n",
+            key->key.len, key->key.s, type_key->len, type_key->s);
+
+        if (db_redis_key_add_string(&query_v, "SADD", 4) != 0) {
+            LM_ERR("Failed to set sadd command to post-insert query\n");
+            goto error;
+        }
+        if (db_redis_key_add_str(&query_v, type_key) != 0) {
+            LM_ERR("Failed to add map key to post-insert query\n");
+            goto error;
+        }
+        if (db_redis_key_add_str(&query_v, &key->key) != 0) {
+            LM_ERR("Failed to set entry key to post-insert query\n");
+            goto error;
+        }
+
+        reply = db_redis_command_argv(con, query_v);
+        db_redis_key_free(&query_v);
+        db_redis_check_reply(con, reply, error);
+        db_redis_free_reply(&reply);
+    }
+
+    db_redis_key_free(&key);
+    db_redis_key_free(&type_keys);
+    db_redis_consume_replies(con);
+
+    return 0;
+
+error:
+    db_redis_key_free(&key);
+    db_redis_key_free(&type_keys);
+    db_redis_key_free(&query_v);
+
+    if (reply)
+        db_redis_free_reply(&reply);
+
+    db_redis_consume_replies(con);
+
+    LM_ERR("failed to do the insert\n");
+    return -1;
+}
+
+/*
+ * Delete a row from the specified table
+ * _h: structure representing database connection
+ * _k: key names
+ * _o: operators
+ * _v: values of the keys that must match
+ * _n: number of key=value pairs
+ */
+int db_redis_delete(const db1_con_t* _h, const db_key_t* _k,
+        const db_op_t* _op, const db_val_t* _v, const int _n)
+{
+    km_redis_con_t *con = NULL;
+    redis_key_t *keys = NULL;
+    int keys_count = 0;
+    int *manual_keys = NULL;
+    int manual_keys_count = 0;
+    int free_op = 0;
+    int do_table_scan = 0;
+    db_op_t *query_ops = NULL;
+
+    // TODO: optimize mapping-based manual post-check (remove check for keys already
+    // in type query key)
+
+    con = REDIS_CON(_h);
+    if (con && con->con == NULL) {
+        if (db_redis_connect(con) != 0) {
+            LM_ERR("Failed to reconnect to server\n");
+            return -1;
+        }
+    }
+    if(con == NULL || con->id == NULL || con->con == NULL) {
+        LM_ERR("connection to server is null\n");
+        return -1;
+    }
+    if(CON_TABLE(_h)->s == NULL) {
+        LM_ERR("prefix (table) name not set\n");
+        return -1;
+    } else {
+        LM_DBG("deleting from prefix (table) '%.*s'\n",
+                CON_TABLE(_h)->len, CON_TABLE(_h)->s);
+    }
+
+    free_op = 0;
+
+    if (_op == NULL) {
+        char *op = "=";
+        free_op = 1;
+        query_ops = (db_op_t*)pkg_malloc(_n * sizeof(db_op_t));
+        if (!query_ops) {
+            LM_ERR("Failed to allocate memory for query op list\n");
+            goto error;
+        }
+        for (int i = 0; i < _n; ++i) {
+            query_ops[i] = op;
+        }
+    } else {
+        query_ops = (db_op_t*)_op;
+    }
+
+    if (_n > 0) {
+        if (db_redis_build_query_keys(con, CON_TABLE(_h), _k, _v, query_ops, _n,
+                    &keys, &keys_count, &manual_keys, &manual_keys_count, &do_table_scan) != 0) {
+            LM_ERR("failed to build query keys\n");
+            goto error;
+        }
+        if (!keys_count) {
+            if (do_table_scan) {
+                LM_DBG("unable to build query keys, falling back to full table scan\n");
+            } else {
+                LM_DBG("query keys not member of suitable mapping, skip full table scan\n");
+            }
+        }
+    } else {
+        LM_DBG("no columns given to build query keys, falling back to full table scan\n");
+        keys_count = 0;
+    }
+
+    if (db_redis_perform_delete(_h, con, _k, _v, query_ops, _n,
+        keys, keys_count, manual_keys, manual_keys_count, do_table_scan) != 0) {
+        goto error;
+    }
+
+    LM_DBG("done performing delete\n");
+
+    if (free_op && query_ops) {
+        pkg_free(query_ops);
+    }
+    db_redis_key_free(&keys);
+    if (manual_keys)
+        pkg_free(manual_keys);
+    db_redis_consume_replies(con);
+
+    return 0;
+
+error:
+    LM_ERR("failed to do the query\n");
+    if (free_op && query_ops) {
+        pkg_free(query_ops);
+    }
+    db_redis_key_free(&keys);
+    if (manual_keys)
+        pkg_free(manual_keys);
+    db_redis_consume_replies(con);
+    return -1;
+}
+
+/*
+ * Update some rows in the specified table
+ * _h: structure representing database connection
+ * _k: key names
+ * _op: operators
+ * _v: values of the keys that must match
+ * _uk: updated columns
+ * _uv: updated values of the columns
+ * _n: number of key=value pairs
+ * _un: number of columns to update
+ */
+int db_redis_update(const db1_con_t* _h, const db_key_t* _k,
+        const db_op_t* _op, const db_val_t* _v, const db_key_t* _uk,
+        const db_val_t* _uv, const int _n, const int _nu)
+{
+    km_redis_con_t *con = NULL;
+    int free_op = 0;
+    int do_table_scan = 0;
+
+    redis_key_t *keys = NULL;
+    int keys_count = 0;
+    int *manual_keys = NULL;
+    int manual_keys_count = 0;
+    db_op_t *query_ops = NULL;
+
+    // TODO: optimize mapping-based manual post-check (remove check for keys already
+    // in type query key)
+
+    con = REDIS_CON(_h);
+    if (con && con->con == NULL) {
+        if (db_redis_connect(con) != 0) {
+            LM_ERR("Failed to reconnect to server\n");
+            return -1;
+        }
+    }
+    if(con == NULL || con->id == NULL || con->con == NULL) {
+        LM_ERR("connection to server is null\n");
+        return -1;
+    }
+    if(CON_TABLE(_h)->s == NULL) {
+        LM_ERR("prefix (table) name not set\n");
+        return -1;
+    } else {
+        LM_DBG("updating prefix (table) '%.*s'\n",
+                CON_TABLE(_h)->len, CON_TABLE(_h)->s);
+    }
+
+    free_op = 0;
+
+    if (_op == NULL) {
+        char *op = "=";
+        free_op = 1;
+        query_ops = (db_op_t*)pkg_malloc(_n * sizeof(db_op_t));
+        if (!query_ops) {
+            LM_ERR("Failed to allocate memory for query op list\n");
+            goto error;
+        }
+        for (int i = 0; i < _n; ++i) {
+            query_ops[i] = op;
+        }
+    } else {
+        query_ops = (db_op_t*)_op;
+    }
+
+    if (_n > 0) {
+        if (db_redis_build_query_keys(con, CON_TABLE(_h), _k, _v, query_ops, _n,
+                    &keys, &keys_count, &manual_keys, &manual_keys_count, &do_table_scan) != 0) {
+            LM_ERR("failed to build query keys\n");
+            goto error;
+        }
+        if (!keys_count) {
+            if (do_table_scan) {
+                LM_DBG("unable to build query keys, falling back to full table scan\n");
+            } else {
+                LM_DBG("query keys not member of suitable mapping, skip full table scan\n");
+            }
+        }
+    } else {
+        LM_DBG("no columns given to build query keys, falling back to full table scan\n");
+        keys_count = 0;
+    }
+
+    if (db_redis_perform_update(_h, con, _k, _v, query_ops, _uk, _uv, _n, _nu,
+        &keys, &keys_count, &manual_keys, &manual_keys_count, do_table_scan) != 0) {
+        goto error;
+    }
+
+    LM_DBG("done performing update\n");
+
+    if (free_op && query_ops) {
+        pkg_free(query_ops);
+    }
+    db_redis_key_free(&keys);
+
+    if (manual_keys) {
+        pkg_free(manual_keys);
+    }
+    db_redis_consume_replies(con);
+    return 0;
+
+error:
+    LM_ERR("failed to do the query\n");
+    if (free_op && query_ops) {
+        pkg_free(query_ops);
+    }
+    db_redis_key_free(&keys);
+    if (manual_keys) {
+        pkg_free(manual_keys);
+    }
+    db_redis_consume_replies(con);
+    return -1;
+}
+
+/*
+ * Just like insert, but replace the row if it exists
+ */
+int db_redis_replace(const db1_con_t* _h, const db_key_t* _k,
+        const db_val_t* _v, const int _n, const int _un, const int _m) {
+    LM_DBG("perform redis replace\n");
+    return -1;
+}
+
+/*
+ * Store name of table that will be used by
+ * subsequent database functions
+ */
+int db_redis_use_table(db1_con_t* _h, const str* _t) {
+    return db_use_table(_h, _t);
+}
+
+int db_redis_free_result(db1_con_t* _h, db1_res_t* _r) {
+    LM_DBG("perform redis free result\n");
+
+    if(!_r)
+        return -1;
+    db_free_result(_r);
+    return 0;
+}

+ 88 - 0
src/modules/db_redis/redis_dbase.h

@@ -0,0 +1,88 @@
+/*
+ * Copyright (C) 2018 Andreas Granig (sipwise.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ */
+
+
+#ifndef _REDIS_DBASE_H_
+#define _REDIS_DBASE_H_
+
+#include "db_redis_mod.h"
+
+/*
+ * Initialize database connection
+ */
+db1_con_t* db_redis_init(const str* _sqlurl);
+
+/*
+ * Close a database connection
+ */
+void db_redis_close(db1_con_t* _h);
+
+/*
+ * Free all memory allocated by get_result
+ */
+int db_redis_free_result(db1_con_t* _h, db1_res_t* _r);
+
+/*
+ * Do a query
+ */
+int db_redis_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op, const db_val_t* _v,
+const db_key_t* _c, const int _n, const int _nc, const db_key_t _o, db1_res_t** _r);
+
+/*
+ * Fetch rows from a result
+ */
+int db_redis_fetch_result(const db1_con_t* _h, db1_res_t** _r, const int nrows);
+
+/*
+ * Raw SQL query
+ */
+int db_redis_raw_query(const db1_con_t* _h, const str* _s, db1_res_t** _r);
+
+/*
+ * Insert a row into table
+ */
+int db_redis_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v, const int _n);
+
+/*
+ * Delete a row from table
+ */
+int db_redis_delete(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _o, const db_val_t* _v,
+const int _n);
+
+/*
+ * Update a row in table
+ */
+int db_redis_update(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _o, const db_val_t* _v,
+const db_key_t* _uk, const db_val_t* _uv, const int _n, const int _un);
+
+/*
+ * Just like insert, but replace the row if it exists
+ */
+int db_redis_replace(const db1_con_t* handle, const db_key_t* keys, const db_val_t* vals,
+		const int n, const int _un, const int _m);
+
+/*
+ * Store name of table that will be used by
+ * subsequent database functions
+ */
+int db_redis_use_table(db1_con_t* _h, const str* _t);
+
+#endif  /* _REDIS_BASE_H_ */

+ 692 - 0
src/modules/db_redis/redis_table.c

@@ -0,0 +1,692 @@
+/*
+ * Copyright (C) 2018 Andreas Granig (sipwise.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+ */
+
+#include <stdlib.h>
+
+#include "db_redis_mod.h"
+#include "redis_connection.h"
+#include "redis_table.h"
+
+int db_redis_key_add_string(redis_key_t **list, const char* entry, int len) {
+    redis_key_t *k;
+
+
+    k = (redis_key_t*)pkg_malloc(sizeof(redis_key_t));
+    if (!k) {
+        LM_ERR("Failed to allocate memory for key list entry\n");
+        goto err;
+    }
+    k->next = NULL;
+
+    k->key.s = (char*)pkg_malloc((len+1) * sizeof(char));
+    if (!k->key.s) {
+        LM_ERR("Failed to allocate memory for key list entry\n");
+        goto err;
+    }
+
+    memcpy(k->key.s, entry, len);
+    k->key.s[len] = '\0';
+    k->key.len = len;
+
+    if (!*list) {
+        *list = k;
+    } else {
+        redis_key_t *l = *list;
+        while (l->next)
+            l = l->next;
+        l->next = k;
+    }
+
+    return 0;
+
+err:
+    if (k)
+        pkg_free(k);
+    return -1;
+}
+
+int db_redis_key_add_str(redis_key_t **list, const str* entry) {
+    return db_redis_key_add_string(list, entry->s, entry->len);
+}
+
+int db_redis_key_prepend_string(redis_key_t **list, const char* entry, int len) {
+    redis_key_t *k;
+
+    k = (redis_key_t*)pkg_malloc(sizeof(redis_key_t));
+    if (!k) {
+        LM_ERR("Failed to allocate memory for key list entry\n");
+        goto err;
+    }
+    k->next = NULL;
+
+    k->key.s = (char*)pkg_malloc((len+1) * sizeof(char));
+    if (!k->key.s) {
+        LM_ERR("Failed to allocate memory for key list entry\n");
+        goto err;
+    }
+
+    memset(k->key.s, 0, len+1);
+    strncpy(k->key.s, entry, len);
+    k->key.len = len;
+
+    if (!*list) {
+        *list = k;
+    } else {
+        k->next = *list;
+        *list = k;
+    }
+
+    return 0;
+
+err:
+    if (k)
+        pkg_free(k);
+    return -1;
+}
+
+redis_key_t * db_redis_key_unshift(redis_key_t **list) {
+    redis_key_t *k;
+
+    k = *list;
+    *list = (*list)->next;
+    k->next = NULL;
+    return k;
+}
+
+void db_redis_key_free(redis_key_t **list) {
+    redis_key_t *l;
+    redis_key_t **head;
+
+    if (!list || !*list) {
+        return;
+    }
+    head = list;
+    do {
+        l = (*list)->next;
+        if ((*list)->key.s) {
+            pkg_free((*list)->key.s);
+            (*list)->key.s = NULL;
+            (*list)->key.len = 0;
+        }
+        pkg_free(*list);
+        *list = l;
+    } while (l);
+    *head = NULL;
+}
+
+int db_redis_key_list2arr(redis_key_t *list, char ***arr) {
+    int len = 0, i = 0;
+    redis_key_t *tmp = NULL;
+
+    *arr = NULL;
+    for (tmp = list, len = 0; tmp; tmp = tmp->next, len++);
+    if (len < 1) {
+        return 0;
+    }
+
+    *arr = (char**)pkg_malloc(len * sizeof (char*));
+    if (!*arr) {
+        LM_ERR("Failed to allocate memory for array\n");
+        return -1;
+    }
+    for (tmp = list, i = 0; tmp; tmp = tmp->next, i++) {
+        (*arr)[i] = tmp->key.s;
+        LM_DBG("++++++ assign #%d (%s) from list to array\n", i, (*arr)[i]);
+    }
+    LM_DBG("returning %d entries\n", len);
+
+    return len;
+}
+
+
+int db_redis_schema_get_column_type(km_redis_con_t *con, const str *table_name, const str *col_name) {
+    struct str_hash_entry *table_e;
+    struct str_hash_entry *col_e;
+    redis_table_t *table;
+
+    table_e = str_hash_get(&con->tables, table_name->s, table_name->len);
+    if (!table_e) {
+        LM_ERR("Failed to find table '%.*s' in table hash\n",
+                table_name->len, table_name->s);
+        return -1;
+    }
+    table = (redis_table_t*)table_e->u.p;
+    col_e = str_hash_get(&table->columns, col_name->s, col_name->len);
+    if (!col_e) {
+        LM_ERR("Failed to find column '%.*s' in schema for table '%.*s'\n",
+                col_name->len, col_name->s,
+                table_name->len, table_name->s);
+        return -1;
+    }
+    return col_e->u.n;
+}
+
+void db_redis_print_all_tables(km_redis_con_t *con) {
+    struct str_hash_table *ht;
+    struct str_hash_table *col_ht;
+    struct str_hash_entry *he;
+    struct str_hash_entry *col_he;
+    struct str_hash_entry *last;
+    struct str_hash_entry *col_last;
+    redis_table_t *table;
+    redis_key_t *key;
+    redis_type_t *type;
+
+    LM_DBG("dumping all redis tables:\n");
+    ht = &con->tables;
+
+    for (int i = 0; i < ht->size; ++i) {
+        last = (&ht->table[i])->prev;
+        clist_foreach(&ht->table[i], he, next) {
+            LM_DBG("  table %.*s\n", he->key.len, he->key.s);
+            table = (redis_table_t*) he->u.p;
+
+            LM_DBG("    schema:\n");
+            col_ht = &table->columns;
+            for (int j = 0; j < col_ht->size; ++j) {
+                col_last = (&col_ht->table[j])->prev;
+                clist_foreach(&col_ht->table[j], col_he, next) {
+                    LM_DBG("      %.*s: %d\n",
+                            col_he->key.len, col_he->key.s, col_he->u.n);
+                    if (col_he == col_last) break;
+                }
+            }
+
+            LM_DBG("    entry keys:\n");
+            key = table->entry_keys;
+            while (key) {
+                LM_DBG("      %.*s\n", key->key.len, key->key.s);
+                key = key->next;
+            }
+
+            type = table->types;
+            while (type) {
+                LM_DBG("    %.*s keys:\n", type->type.len, type->type.s);
+                key = type->keys;
+                while (key) {
+                    LM_DBG("      %.*s\n", key->key.len, key->key.s);
+                    key = key->next;
+                }
+                type = type->next;
+            }
+
+            if (he == last) break;
+        }
+    }
+}
+
+void db_redis_print_table(km_redis_con_t *con, char *name) {
+    str table_name;
+    struct str_hash_entry *table_entry;
+    redis_table_t *table;
+    redis_key_t *key;
+    redis_type_t *type;
+
+    struct str_hash_table *col_ht;
+    struct str_hash_entry *col_he;
+    struct str_hash_entry *col_last;
+
+    table_name.s = name;
+    table_name.len = strlen(name);
+
+    table_entry = str_hash_get(&con->tables, table_name.s, table_name.len);
+    if (!table_entry) {
+        LM_ERR("Failed to print table '%.*s', no such table entry found\n",
+                table_name.len, table_name.s);
+        return;
+    }
+
+    table = (redis_table_t*) table_entry->u.p;
+    LM_DBG("table %.*s:\n", table_name.len, table_name.s);
+
+    LM_DBG("  schema:\n");
+    col_ht = &table->columns;
+    for (int j = 0; j < col_ht->size; ++j) {
+        col_last = (&col_ht->table[j])->prev;
+        clist_foreach(&col_ht->table[j], col_he, next) {
+            LM_DBG("    %.*s: %d\n",
+                    col_he->key.len, col_he->key.s, col_he->u.n);
+            if (col_he == col_last) break;
+        }
+    }
+
+    LM_DBG("  entry keys:\n");
+    key = table->entry_keys;
+    while (key) {
+        LM_DBG("    %.*s\n", key->key.len, key->key.s);
+        key = key->next;
+    }
+
+    type = table->types;
+    while (type) {
+        LM_DBG("  %.*s keys:\n", type->type.len, type->type.s);
+        key = type->keys;
+        while (key) {
+            LM_DBG("    %.*s\n", key->key.len, key->key.s);
+            key = key->next;
+        }
+        type = type->next;
+    }
+}
+
+void db_redis_free_tables(km_redis_con_t *con) {
+    // TODO: also free schema hash?
+    struct str_hash_table *ht;
+    struct str_hash_table *col_ht;
+    struct str_hash_entry *he;
+    struct str_hash_entry *col_he;
+    struct str_hash_entry *last;
+    struct str_hash_entry *col_last;
+    redis_table_t *table;
+    redis_key_t *key, *tmpkey;
+    redis_type_t *type, *tmptype;
+
+    ht = &con->tables;
+    for (int i = 0; i < ht->size; ++i) {
+        last = (&ht->table[i])->prev;
+        clist_foreach(&ht->table[i], he, next) {
+            table = (redis_table_t*) he->u.p;
+
+            col_ht = &table->columns;
+            for (int j = 0; j < col_ht->size; ++j) {
+                col_last = (&col_ht->table[j])->prev;
+                clist_foreach(&col_ht->table[j], col_he, next) {
+                    pkg_free(col_he);
+                    if (col_he == col_last) break;
+                }
+            }
+            pkg_free(col_ht->table);
+
+            key = table->entry_keys;
+            while (key) {
+                tmpkey = key;
+                key = key->next;
+                pkg_free(tmpkey);
+            }
+
+            type = table->types;
+            while (type) {
+                key = type->keys;
+                while (key) {
+                    tmpkey = key;
+                    key = key->next;
+                    pkg_free(tmpkey);
+                }
+                tmptype = type;
+                type = type->next;
+                pkg_free(tmptype);
+            }
+            pkg_free(table);
+            pkg_free(he);
+
+            if (he == last) break;
+        }
+    }
+    pkg_free(ht->table);
+}
+
+static redis_key_t* db_redis_create_key(str *key) {
+    redis_key_t *e;
+    e = (redis_key_t*) pkg_malloc(sizeof(redis_key_t));
+    if (!e) {
+        LM_ERR("Failed to allocate memory for key entry\n");
+        return NULL;
+    }
+    memset(e, 0, sizeof(redis_key_t));
+    e->key.s = key->s;
+    e->key.len = key->len;
+    return e;
+}
+
+static redis_type_t* db_redis_create_type(str *type) {
+    redis_type_t *e;
+    e = (redis_type_t*) pkg_malloc(sizeof(redis_type_t));
+    if (!e) {
+        LM_ERR("Failed to allocate memory for table type\n");
+        return NULL;
+    }
+    e->type.s = type->s;
+    e->type.len = type->len;
+    e->next = NULL;
+    e->keys = NULL;
+    return e;
+}
+
+static struct str_hash_entry* db_redis_create_table(str *table) {
+    struct str_hash_entry *e;
+    redis_table_t *t;
+    e = (struct str_hash_entry*) pkg_malloc(sizeof(struct str_hash_entry));
+    if (!e) {
+        LM_ERR("Failed to allocate memory for table entry\n");
+        return NULL;
+    }
+    memset(e, 0, sizeof(struct str_hash_entry));
+
+    e->key.s = table->s;
+    e->key.len = table->len;
+    e->flags = 0;
+
+    t = (redis_table_t*) pkg_malloc(sizeof(redis_table_t));
+    if (!t) {
+        LM_ERR("Failed to allocate memory for table data\n");
+        pkg_free(e);
+        return NULL;
+    }
+    t->entry_keys = NULL;
+    t->types = NULL;
+
+    if (str_hash_alloc(&t->columns, REDIS_HT_SIZE) != 0) {
+        LM_ERR("Failed to allocate memory for table schema hashtable\n");
+        pkg_free(e);
+        return NULL;
+    }
+    str_hash_init(&t->columns);
+
+    e->u.p = t;
+    return e;
+}
+
+static struct str_hash_entry* db_redis_create_column(str *col, str *type) {
+    struct str_hash_entry *e;
+    e = (struct str_hash_entry*) pkg_malloc(sizeof(struct str_hash_entry));
+    if (!e) {
+        LM_ERR("Failed to allocate memory for column entry\n");
+        return NULL;
+    }
+    e->key.s = col->s;
+    e->key.len = col->len;
+    e->flags = 0;
+    switch (type->s[0]) {
+        case 's':
+        case 'S':
+            e->u.n = DB1_STRING;
+            break;
+        case 'i':
+        case 'I':
+            e->u.n = DB1_INT;
+            break;
+        case 't':
+        case 'T':
+            e->u.n = DB1_DATETIME;
+            break;
+        case 'd':
+        case 'D':
+            e->u.n = DB1_DOUBLE;
+            break;
+        case 'b':
+        case 'B':
+            e->u.n = DB1_BLOB;
+            break;
+        default:
+            LM_ERR("Invalid schema column type '%.*s', expecting one of string, int, timestamp, double, blob\n",
+                    type->len, type->s);
+            pkg_free(e);
+            return NULL;
+    }
+    return e;
+}
+
+int db_redis_parse_keys(km_redis_con_t *con) {
+    char *p;
+    char *start;
+    char *end;
+
+    str table_name;
+    str type_name;
+    str column_name;
+
+    struct str_hash_entry *table_entry;
+    redis_table_t *table;
+    redis_type_t *type;
+    redis_type_t *type_target;
+    redis_key_t *key;
+    redis_key_t **key_target;
+    redis_key_t *key_location;
+
+    enum {
+        DBREDIS_KEYS_TABLE_ST,
+        DBREDIS_KEYS_TYPE_ST,
+        DBREDIS_KEYS_COLUMN_ST,
+        DBREDIS_KEYS_END_ST
+    } state;
+
+    //LM_DBG("parsing keys '%.*s'\n", redis_keys.len, redis_keys.s);
+    if (!redis_keys.len) {
+        LM_ERR("Failed to parse empty 'keys' mod-param, please define it!\n");
+        return -1;
+    }
+
+    end = redis_keys.s + redis_keys.len;
+    p = start = redis_keys.s;
+    state = DBREDIS_KEYS_TABLE_ST;
+    do {
+        switch(state) {
+            case DBREDIS_KEYS_TABLE_ST:
+                while(p != end && *p != '=')
+                    ++p;
+                if (p == end) {
+                    LM_ERR("Invalid table definition, expecting <table>=<definition>\n");
+                    goto err;
+                }
+                table_name.s = start;
+                table_name.len = p - start;
+                state = DBREDIS_KEYS_TYPE_ST;
+                start = ++p;
+                //LM_DBG("found table name '%.*s'\n", table_name.len, table_name.s);
+
+                table_entry = str_hash_get(&con->tables, table_name.s, table_name.len);
+                if (!table_entry) {
+                    LM_ERR("No table schema found for table '%.*s', fix config by adding one to the 'schema' mod-param!\n",
+                            table_name.len, table_name.s);
+                    goto err;
+                }
+                table = table_entry->u.p;
+                break;
+            case DBREDIS_KEYS_TYPE_ST:
+                while(p != end && *p != ':')
+                    ++p;
+                if (p == end) {
+                    LM_ERR("Invalid type definition, expecting <type>:<definition>\n");
+                    goto err;
+                }
+                type_name.s = start;
+                type_name.len = p - start;
+                state = DBREDIS_KEYS_COLUMN_ST;
+                start = ++p;
+                //LM_DBG("found type name '%.*s' for table '%.*s'\n",
+                //        type_name.len, type_name.s,
+                //        table_name.len, table_name.s);
+                if (type_name.len == REDIS_DIRECT_PREFIX_LEN &&
+                        !strncmp(type_name.s, REDIS_DIRECT_PREFIX, type_name.len)) {
+                    key_target = &table->entry_keys;
+                } else {
+                    type = db_redis_create_type(&type_name);
+                    if (!type) goto err;
+                    if (!table->types) {
+                        table->types = type_target = type;
+                    } else {
+                        type_target->next = type;
+                        type_target = type_target->next;
+                    }
+                    key_target = &type->keys;
+                }
+                break;
+            case DBREDIS_KEYS_COLUMN_ST:
+                while(p != end && *p != ',' && *p != '&' && *p != ';')
+                    ++p;
+                if (p == end) {
+                    state = DBREDIS_KEYS_END_ST;
+                } else if (*p == ',') {
+                    state = DBREDIS_KEYS_COLUMN_ST;
+                } else if (*p == '&') {
+                    state = DBREDIS_KEYS_TYPE_ST;
+                } else if (*p == ';') {
+                    state = DBREDIS_KEYS_TABLE_ST;
+                }
+                column_name.s = start;
+                column_name.len = p - start;
+                start = ++p;
+                //LM_DBG("found column name '%.*s' in type '%.*s' for table '%.*s'\n",
+                //        column_name.len, column_name.s,
+                //        type_name.len, type_name.s,
+                //        table_name.len, table_name.s);
+
+                key = db_redis_create_key(&column_name);
+                if (!key) goto err;
+                if (*key_target == NULL) {
+                    *key_target = key_location = key;
+                } else {
+                    key_location->next = key;
+                    key_location = key_location->next;
+                }
+                break;
+            case DBREDIS_KEYS_END_ST:
+                //LM_DBG("done parsing keys definition\n");
+                return 0;
+
+
+        }
+    } while (p != end);
+
+    return 0;
+
+err:
+    db_redis_free_tables(con);
+    return -1;
+}
+
+
+int db_redis_parse_schema(km_redis_con_t *con) {
+    char *p;
+    char *start;
+    char *end;
+
+    str table_name;
+    str column_name;
+    str type_name;
+
+    struct str_hash_entry *table_entry;
+    struct str_hash_entry *column_entry;
+    redis_table_t *table;
+
+    enum {
+        DBREDIS_SCHEMA_TABLE_ST,
+        DBREDIS_SCHEMA_COLUMN_ST,
+        DBREDIS_SCHEMA_TYPE_ST,
+        DBREDIS_SCHEMA_END_ST
+    } state;
+
+    //LM_DBG("parsing schema '%.*s'\n", redis_schema.len, redis_schema.s);
+    if (!redis_schema.len) {
+        LM_ERR("Failed to parse empty 'schema' mod-param, please define it!\n");
+        return -1;
+    }
+
+    if (str_hash_alloc(&con->tables, REDIS_HT_SIZE) != 0) {
+        LM_ERR("Failed to allocate memory for tables hashtable\n");
+        goto err;
+    }
+    str_hash_init(&con->tables);
+
+    end = redis_schema.s + redis_schema.len;
+    p = start = redis_schema.s;
+    state = DBREDIS_SCHEMA_TABLE_ST;
+    do {
+        switch(state) {
+            case DBREDIS_SCHEMA_TABLE_ST:
+                while(p != end && *p != '=')
+                    ++p;
+                if (p == end) {
+                    LM_ERR("Invalid table definition, expecting <table>=<definition>\n");
+                    goto err;
+                }
+                table_name.s = start;
+                table_name.len = p - start;
+                state = DBREDIS_SCHEMA_COLUMN_ST;
+                start = ++p;
+                //LM_DBG("found table name '%.*s'\n", table_name.len, table_name.s);
+
+                table_entry = str_hash_get(&con->tables, table_name.s, table_name.len);
+                if (table_entry) {
+                    LM_ERR("Found duplicate table schema definition '%.*s', fix config by removing one from the 'schema' mod-param!\n",
+                            table_name.len, table_name.s);
+                    goto err;
+                }
+                table_entry = db_redis_create_table(&table_name);
+                if (!table_entry) goto err;
+                str_hash_add(&con->tables, table_entry);
+                table = table_entry->u.p;
+                break;
+            case DBREDIS_SCHEMA_COLUMN_ST:
+                while(p != end && *p != '/')
+                    ++p;
+                if (p == end) {
+                    LM_ERR("Invalid column definition, expecting <column>/<type>\n");
+                    goto err;
+                }
+                column_name.s = start;
+                column_name.len = p - start;
+                state = DBREDIS_SCHEMA_TYPE_ST;
+                start = ++p;
+                //LM_DBG("found column name '%.*s'\n", column_name.len, column_name.s);
+                break;
+            case DBREDIS_SCHEMA_TYPE_ST:
+                while(p != end && *p != ',' && *p != ';')
+                    ++p;
+                type_name.s = start;
+                type_name.len = p - start;
+                if (p == end) {
+                    state = DBREDIS_SCHEMA_END_ST;
+                } else if (*p == ';') {
+                    state = DBREDIS_SCHEMA_TABLE_ST;
+                } else {
+                    state = DBREDIS_SCHEMA_COLUMN_ST;
+                }
+                start = ++p;
+                //LM_DBG("found column type '%.*s' for column name '%.*s' in table '%.*s'\n",
+                //        type_name.len, type_name.s,
+                //        column_name.len, column_name.s,
+                //        table_name.len, table_name.s);
+
+                column_entry = str_hash_get(&table->columns, column_name.s, column_name.len);
+                if (column_entry) {
+                    LM_ERR("Found duplicate column definition '%.*s' in schema definition of table '%.*s', remove one from mod-param 'schema'!\n",
+                            column_name.len, column_name.s,
+                            table_name.len, table_name.s);
+                    goto err;
+                }
+                column_entry = db_redis_create_column(&column_name, &type_name);
+                if (!column_entry) {
+                    goto err;
+                }
+                str_hash_add(&table->columns, column_entry);
+                break;
+            case DBREDIS_SCHEMA_END_ST:
+                //LM_DBG("done parsing redis table schema\n");
+                return 0;
+
+        }
+    } while (p != end);
+
+    return 0;
+err:
+    db_redis_free_tables(con);
+    return -1;
+}

+ 63 - 0
src/modules/db_redis/redis_table.h

@@ -0,0 +1,63 @@
+/**
+ * Copyright (C) 2018 Andreas Granig (sipwise.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ */
+
+
+#ifndef _REDIS_TABLE_H_
+#define _REDIS_TABLE_H_
+
+#include "db_redis_mod.h"
+#include "redis_connection.h"
+
+typedef struct redis_key redis_key_t;
+struct redis_key {
+    str key;
+    redis_key_t *next;
+};
+
+typedef struct redis_type redis_type_t;
+struct redis_type {
+    str type;
+    redis_type_t *next;
+    redis_key_t *keys;
+};
+
+typedef struct redis_table redis_table_t;
+struct redis_table {
+    redis_key_t *entry_keys;
+    redis_type_t *types;
+    struct str_hash_table columns;
+};
+
+int db_redis_schema_get_column_type(km_redis_con_t *con, const str *table_name, const str *col_name);
+void db_redis_print_all_tables(km_redis_con_t *con);
+void db_redis_print_table(km_redis_con_t *con, char *name);
+void db_redis_free_tables(km_redis_con_t *con);
+int db_redis_parse_schema(km_redis_con_t *con);
+int db_redis_parse_keys(km_redis_con_t *con);
+
+int db_redis_key_add_string(redis_key_t* *list, const char* entry, int len);
+int db_redis_key_add_str(redis_key_t **list, const str* entry);
+int db_redis_key_prepend_string(redis_key_t **list, const char* entry, int len);
+int db_redis_key_list2arr(redis_key_t *list, char ***arr);
+redis_key_t * db_redis_key_unshift(redis_key_t **list);
+void db_redis_key_free(redis_key_t **list);
+
+#endif /* _REDIS_TABLE_H_ */