|
@@ -0,0 +1,2128 @@
|
|
|
+/*
|
|
|
+ * Copyright (C) 2018 Andreas Granig (sipwise.com)
|
|
|
+ *
|
|
|
+ * This file is part of Kamailio, a free SIP server.
|
|
|
+ *
|
|
|
+ * Kamailio is free software; you can redistribute it and/or modify
|
|
|
+ * it under the terms of the GNU General Public License as published by
|
|
|
+ * the Free Software Foundation; either version 2 of the License, or
|
|
|
+ * (at your option) any later version
|
|
|
+ *
|
|
|
+ * Kamailio is distributed in the hope that it will be useful,
|
|
|
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
+ * GNU General Public License for more details.
|
|
|
+ *
|
|
|
+ * You should have received a copy of the GNU General Public License
|
|
|
+ * along with this program; if not, write to the Free Software
|
|
|
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
+ *
|
|
|
+ */
|
|
|
+
|
|
|
+#include <stdlib.h>
|
|
|
+#include <time.h>
|
|
|
+
|
|
|
+#include "db_redis_mod.h"
|
|
|
+#include "redis_connection.h"
|
|
|
+#include "redis_dbase.h"
|
|
|
+#include "redis_table.h"
|
|
|
+
|
|
|
+static void db_redis_dump_reply(redisReply *reply) {
|
|
|
+ if (reply->type == REDIS_REPLY_STRING) {
|
|
|
+ LM_DBG("%s\n", reply->str);
|
|
|
+ } else if (reply->type == REDIS_REPLY_INTEGER) {
|
|
|
+ LM_DBG("%lld\n", reply->integer);
|
|
|
+ } else if (reply->type == REDIS_REPLY_NIL) {
|
|
|
+ LM_DBG("<null>\n");
|
|
|
+ } else if (reply->type == REDIS_REPLY_ARRAY) {
|
|
|
+ LM_DBG("printing %lu elements in array reply\n", reply->elements);
|
|
|
+ for(int i = 0; i < reply->elements; ++i) {
|
|
|
+ db_redis_dump_reply(reply->element[i]);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LM_DBG("not printing invalid reply type\n");
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// TODO: utilize auto-expiry? on insert/update, also update expire value
|
|
|
+// of mappings
|
|
|
+
|
|
|
+/*
|
|
|
+ * Initialize database module
|
|
|
+ * No function should be called before this
|
|
|
+ */
|
|
|
+db1_con_t* db_redis_init(const str* _url) {
|
|
|
+ return db_do_init(_url, (void *)db_redis_new_connection);
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ * Shut down database module
|
|
|
+ * No function should be called after this
|
|
|
+ */
|
|
|
+void db_redis_close(db1_con_t* _h) {
|
|
|
+ LM_DBG("closing redis db connection\n");
|
|
|
+ db_do_close(_h, db_redis_free_connection);
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+static db1_res_t* db_redis_new_result(void) {
|
|
|
+ db1_res_t* obj;
|
|
|
+
|
|
|
+ obj = db_new_result();
|
|
|
+ if (!obj)
|
|
|
+ return NULL;
|
|
|
+ return obj;
|
|
|
+}
|
|
|
+
|
|
|
+static int db_redis_val2str(const db_val_t *v, str *_str) {
|
|
|
+ const char *s;
|
|
|
+ const str *tmpstr;
|
|
|
+ int vtype = VAL_TYPE(v);
|
|
|
+ _str->s = NULL;
|
|
|
+ _str->len = 32; // default for numbers
|
|
|
+
|
|
|
+ if (VAL_NULL(v)) {
|
|
|
+ LM_DBG("converting <null> value to str\n");
|
|
|
+ _str->len = 0;
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ switch (vtype) {
|
|
|
+ case DB1_INT:
|
|
|
+ LM_DBG("converting int value %d to str\n", VAL_INT(v));
|
|
|
+ _str->s = (char*)pkg_malloc(_str->len);
|
|
|
+ if (!_str->s) goto memerr;
|
|
|
+ snprintf(_str->s, _str->len, "%d", VAL_INT(v));
|
|
|
+ _str->len = strlen(_str->s);
|
|
|
+ break;
|
|
|
+ case DB1_BIGINT:
|
|
|
+ LM_DBG("converting bigint value %lld to str\n", VAL_BIGINT(v));
|
|
|
+ _str->s = (char*)pkg_malloc(_str->len);
|
|
|
+ if (!_str->s) goto memerr;
|
|
|
+ snprintf(_str->s, _str->len, "%lld", VAL_BIGINT(v));
|
|
|
+ _str->len = strlen(_str->s);
|
|
|
+ break;
|
|
|
+ case DB1_STRING:
|
|
|
+ s = VAL_STRING(v);
|
|
|
+ _str->len = strlen(s);
|
|
|
+ LM_DBG("converting string value '%s' with len %d to str\n", s, _str->len);
|
|
|
+ _str->s = (char*)pkg_malloc(_str->len + 1);
|
|
|
+ if (!_str->s) goto memerr;
|
|
|
+ //memcpy(_str->s, s, _str->len);
|
|
|
+ //_str->s[_str->len] = '\0';
|
|
|
+ memset(_str->s, 0, _str->len + 1);
|
|
|
+ strncpy(_str->s, s, _str->len);
|
|
|
+ break;
|
|
|
+ case DB1_STR:
|
|
|
+ tmpstr = &(VAL_STR(v));
|
|
|
+ LM_DBG("converting str value '%.*s' with len %d to str\n", tmpstr->len, tmpstr->s, tmpstr->len);
|
|
|
+ // copy manually to add 0 termination
|
|
|
+ _str->s = (char*)pkg_malloc(tmpstr->len + 1);
|
|
|
+ if (!_str->s) goto memerr;
|
|
|
+ _str->len = tmpstr->len;
|
|
|
+ memcpy(_str->s, tmpstr->s, _str->len);
|
|
|
+ _str->s[_str->len] = '\0';
|
|
|
+ break;
|
|
|
+ case DB1_DATETIME:
|
|
|
+ LM_DBG("converting datetime value %ld to str\n", VAL_TIME(v));
|
|
|
+ _str->s = (char*)pkg_malloc(_str->len);
|
|
|
+ if (!_str->s) goto memerr;
|
|
|
+ strftime(_str->s, _str->len, "%Y-%m-%d %H:%M:%S", localtime(&(VAL_TIME(v))));
|
|
|
+ _str->len = strlen(_str->s);
|
|
|
+ break;
|
|
|
+ case DB1_DOUBLE:
|
|
|
+ LM_DBG("converting double value %f to str\n", VAL_DOUBLE(v));
|
|
|
+ _str->s = (char*)pkg_malloc(_str->len);
|
|
|
+ if (!_str->s) goto memerr;
|
|
|
+ snprintf(_str->s, _str->len, "%.6f", VAL_DOUBLE(v));
|
|
|
+ _str->len = strlen(_str->s);
|
|
|
+ break;
|
|
|
+ case DB1_BITMAP:
|
|
|
+ LM_DBG("converting bitmap value %u to str\n", VAL_BITMAP(v));
|
|
|
+ _str->s = (char*)pkg_malloc(_str->len);
|
|
|
+ if (!_str->s) goto memerr;
|
|
|
+ snprintf(_str->s, _str->len, "%u", VAL_BITMAP(v));
|
|
|
+ _str->len = strlen(_str->s);
|
|
|
+ break;
|
|
|
+ case DB1_BLOB:
|
|
|
+ default:
|
|
|
+ LM_ERR("Unsupported val type %d\n", vtype);
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+
|
|
|
+ return 0;
|
|
|
+
|
|
|
+memerr:
|
|
|
+ LM_ERR("Failed to allocate memory to convert value to string\n");
|
|
|
+err:
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+static int db_redis_return_version(const db1_con_t* _h, km_redis_con_t *con, const str *table_name,
|
|
|
+ db1_res_t **_r) {
|
|
|
+
|
|
|
+ struct str_hash_entry *table_e;
|
|
|
+ redis_table_t *table;
|
|
|
+ db_val_t* dval;
|
|
|
+ db_row_t* drow;
|
|
|
+
|
|
|
+ LM_DBG("get table version\n");
|
|
|
+
|
|
|
+ table_e = str_hash_get(&con->tables, table_name->s, table_name->len);
|
|
|
+ if (!table_e) {
|
|
|
+ LM_ERR("query to undefined table '%.*s', define it in schema file!\n",
|
|
|
+ table_name->len, table_name->s);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ table = (redis_table_t*)table_e->u.p;
|
|
|
+
|
|
|
+ *_r = db_redis_new_result();
|
|
|
+ if (!*_r) {
|
|
|
+ LM_ERR("Failed to allocate memory for result");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ RES_NUM_ROWS(*_r) = 1;
|
|
|
+ RES_COL_N(*_r) = 1;
|
|
|
+ RES_ROW_N(*_r) = 1;
|
|
|
+ if (db_allocate_rows(*_r) != 0) {
|
|
|
+ LM_ERR("Failed to allocate memory for rows\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ if (db_allocate_columns(*_r, 1) != 0) {
|
|
|
+ LM_ERR("Failed to allocate memory for result columns");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+
|
|
|
+ drow = &(RES_ROWS(*_r)[0]);
|
|
|
+
|
|
|
+ if (db_allocate_row(*_r, drow) != 0) {
|
|
|
+ LM_ERR("Failed to allocate row %d\n", RES_NUM_ROWS(*_r));
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+
|
|
|
+ dval = &(ROW_VALUES(drow)[0]);
|
|
|
+
|
|
|
+ VAL_TYPE(dval) = DB1_INT;
|
|
|
+ VAL_NULL(dval) = 0;
|
|
|
+ VAL_INT(dval) = table->version;
|
|
|
+
|
|
|
+ LM_DBG("returning short-cut table version %d for table '%.*s'",
|
|
|
+ table->version, table_name->len, table_name->s);
|
|
|
+
|
|
|
+ return 0;
|
|
|
+
|
|
|
+err:
|
|
|
+ if (*_r) db_redis_free_result((db1_con_t*)_h, *_r);
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+static int db_redis_build_entry_manual_keys(redis_table_t *table, const db_key_t *_k, const db_val_t *_v, const int _n, int **manual_keys, int *manual_key_count) {
|
|
|
+
|
|
|
+ // TODO: we also put keys here which are already part of type mapping!
|
|
|
+ // there must be removed for performance reasons
|
|
|
+
|
|
|
+ redis_key_t *key = NULL;
|
|
|
+
|
|
|
+ *manual_keys = (int*)pkg_malloc(_n * sizeof(int));
|
|
|
+ if (! *manual_keys) {
|
|
|
+ LM_ERR("Failed to allocate memory for manual key indices\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ memset(*manual_keys, 0, _n * sizeof(int));
|
|
|
+ *manual_key_count = 0;
|
|
|
+
|
|
|
+ for (key = table->entry_keys; key; key = key->next) {
|
|
|
+ int subkey_found = 0;
|
|
|
+ LM_DBG("checking for existence of entry key '%.*s' in query to get manual key\n",
|
|
|
+ key->key.len, key->key.s);
|
|
|
+ for (int i = 0; i < _n; ++i) {
|
|
|
+ const db_key_t k = _k[i];
|
|
|
+ if (!str_strcmp(&key->key, (str*)k)) {
|
|
|
+ LM_DBG("found key in entry key\n");
|
|
|
+ subkey_found = 1;
|
|
|
+ break;
|
|
|
+ } else {
|
|
|
+ (*manual_keys)[*manual_key_count] = i;
|
|
|
+ (*manual_key_count)++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!subkey_found) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+
|
|
|
+err:
|
|
|
+ if (*manual_keys) {
|
|
|
+ pkg_free(*manual_keys);
|
|
|
+ *manual_keys = NULL;
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+static int db_redis_find_query_key(redis_key_t *key, const str *table_name, str *type_name, const db_key_t *_k, const db_val_t *_v, const int _n, str *key_name, int *key_found) {
|
|
|
+
|
|
|
+ unsigned int len;
|
|
|
+ str val = {NULL, 0};
|
|
|
+
|
|
|
+ *key_found = 1;
|
|
|
+ key_name->len = 0;
|
|
|
+ key_name->s = NULL;
|
|
|
+
|
|
|
+ for (; key; key = key->next) {
|
|
|
+ int subkey_found = 0;
|
|
|
+ LM_DBG("checking for existence of entry key '%.*s' in query\n",
|
|
|
+ key->key.len, key->key.s);
|
|
|
+ for (int i = 0; i < _n; ++i) {
|
|
|
+ const db_key_t k = _k[i];
|
|
|
+ const db_val_t v = _v[i];
|
|
|
+
|
|
|
+ if (VAL_NULL(&v)) {
|
|
|
+ LM_DBG("Skipping null value for given key '%.*s'\n",
|
|
|
+ k->len, k->s);
|
|
|
+ break;
|
|
|
+ } else if (!str_strcmp(&key->key, (str*)k)) {
|
|
|
+ LM_DBG("found key in entry key\n");
|
|
|
+ if (db_redis_val2str(&v, &val) != 0) goto err;
|
|
|
+ if (val.s == NULL) {
|
|
|
+ LM_DBG("key value in entry key is null, skip key\n");
|
|
|
+ subkey_found = 0;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if (!key_name->len) {
|
|
|
+ // <table_name>:<type>::<val>
|
|
|
+ len = table_name->len + 1 + type_name->len + 2 + val.len + 1; //snprintf writes term 0 char
|
|
|
+ key_name->s = (char*)pkg_malloc(len);
|
|
|
+ if (!key_name->s) {
|
|
|
+ LM_ERR("Failed to allocate key memory\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ snprintf(key_name->s, len, "%.*s:%.*s::%.*s",
|
|
|
+ table_name->len, table_name->s,
|
|
|
+ type_name->len, type_name->s,
|
|
|
+ val.len, val.s);
|
|
|
+ key_name->len = len-1; // subtract the term 0 char
|
|
|
+
|
|
|
+ } else {
|
|
|
+ // :<val>
|
|
|
+ key_name->s = (char*)pkg_realloc(key_name->s, key_name->len + val.len + 2);
|
|
|
+ if (!key_name->s) {
|
|
|
+ LM_ERR("Failed to allocate key memory\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ snprintf(key_name->s + key_name->len, 1 + val.len + 1, ":%.*s",
|
|
|
+ val.len, val.s);
|
|
|
+ key_name->len += (1 + val.len);
|
|
|
+ }
|
|
|
+ LM_DBG("entry key so far is '%.*s'\n", key_name->len, key_name->s);
|
|
|
+ subkey_found = 1;
|
|
|
+ pkg_free(val.s);
|
|
|
+ val.s = NULL;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!subkey_found) {
|
|
|
+ LM_DBG("key '%.*s' for type '%.*s' not found, unable to use this type\n",
|
|
|
+ key->key.len, key->key.s, type_name->len, type_name->s);
|
|
|
+ if (key_name->s) {
|
|
|
+ pkg_free(key_name->s);
|
|
|
+ key_name->s = NULL;
|
|
|
+ key_name->len = 0;
|
|
|
+ }
|
|
|
+ *key_found = 0;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return 0;
|
|
|
+
|
|
|
+err:
|
|
|
+ if (val.s)
|
|
|
+ pkg_free(val.s);
|
|
|
+ if(key_name->s) {
|
|
|
+ pkg_free(key_name->s);
|
|
|
+ key_name->s = NULL;
|
|
|
+ key_name->len = 0;
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+static int db_redis_build_entry_keys(km_redis_con_t *con, const str *table_name,
|
|
|
+ const db_key_t *_k, const db_val_t *_v, const int _n,
|
|
|
+ redis_key_t **keys, int *keys_count) {
|
|
|
+
|
|
|
+ struct str_hash_entry *table_e;
|
|
|
+ redis_table_t *table;
|
|
|
+ redis_key_t *key;
|
|
|
+ int key_found;
|
|
|
+ str type_name = str_init("entry");
|
|
|
+ str keyname = {NULL, 0};
|
|
|
+
|
|
|
+ LM_DBG("build entry keys\n");
|
|
|
+
|
|
|
+ table_e = str_hash_get(&con->tables, table_name->s, table_name->len);
|
|
|
+ if (!table_e) {
|
|
|
+ LM_ERR("query to undefined table '%.*s', define in db_redis keys parameter!",
|
|
|
+ table_name->len, table_name->s);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ table = (redis_table_t*)table_e->u.p;
|
|
|
+ key = table->entry_keys;
|
|
|
+ if (db_redis_find_query_key(key, table_name, &type_name, _k, _v, _n, &keyname, &key_found) != 0) {
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ if (key_found) {
|
|
|
+ db_redis_key_add_str(keys, &keyname);
|
|
|
+
|
|
|
+ if (db_redis_key_add_str(keys, &keyname) != 0) {
|
|
|
+ LM_ERR("Failed to add key string\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ LM_DBG("found suitable entry key '%.*s' for query\n",
|
|
|
+ (*keys)->key.len, (*keys)->key.s);
|
|
|
+ *keys_count = 1;
|
|
|
+ pkg_free(keyname.s);
|
|
|
+ } else {
|
|
|
+ LM_ERR("Failed to create direct entry key, no matching key definition\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+
|
|
|
+ return 0;
|
|
|
+
|
|
|
+err:
|
|
|
+ db_redis_key_free(keys);
|
|
|
+ if (keyname.s)
|
|
|
+ pkg_free(keyname.s);
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+static int db_redis_get_keys_for_all_types(km_redis_con_t *con, const str *table_name,
|
|
|
+ redis_key_t **keys, int *keys_count) {
|
|
|
+
|
|
|
+ struct str_hash_entry *table_e;
|
|
|
+ redis_table_t *table;
|
|
|
+ redis_type_t *type;
|
|
|
+ redis_key_t *key;
|
|
|
+
|
|
|
+ *keys = NULL;
|
|
|
+ *keys_count = 0;
|
|
|
+
|
|
|
+ table_e = str_hash_get(&con->tables, table_name->s, table_name->len);
|
|
|
+ if (!table_e) {
|
|
|
+ LM_ERR("query to undefined table '%.*s', define in db_redis keys parameter!",
|
|
|
+ table_name->len, table_name->s);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ table = (redis_table_t*)table_e->u.p;
|
|
|
+
|
|
|
+ for (type = table->types; type; type = type->next) {
|
|
|
+ for (key = type->keys; key; key = key->next) {
|
|
|
+ if (db_redis_key_add_str(keys, &key->key) != 0) {
|
|
|
+ LM_ERR("Failed to add key string\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ (*keys_count)++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return 0;
|
|
|
+
|
|
|
+err:
|
|
|
+ db_redis_key_free(keys);
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+static int db_redis_build_type_keys(km_redis_con_t *con, const str *table_name,
|
|
|
+ const db_key_t *_k, const db_val_t *_v, const int _n,
|
|
|
+ redis_key_t **keys, int *keys_count) {
|
|
|
+
|
|
|
+ struct str_hash_entry *table_e;
|
|
|
+ redis_table_t *table;
|
|
|
+ redis_type_t *type;
|
|
|
+ redis_key_t *key;
|
|
|
+
|
|
|
+ *keys = NULL;
|
|
|
+ *keys_count = 0;
|
|
|
+
|
|
|
+ LM_DBG("build type keys\n");
|
|
|
+
|
|
|
+ table_e = str_hash_get(&con->tables, table_name->s, table_name->len);
|
|
|
+ if (!table_e) {
|
|
|
+ LM_ERR("query to undefined table '%.*s', define in db_redis keys parameter!",
|
|
|
+ table_name->len, table_name->s);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ table = (redis_table_t*)table_e->u.p;
|
|
|
+
|
|
|
+ for (type = table->types; type; type = type->next) {
|
|
|
+ str *type_name = &(type->type);
|
|
|
+ int key_found = 0;
|
|
|
+ str keyname = {NULL, 0};
|
|
|
+ key = type->keys;
|
|
|
+
|
|
|
+ if (db_redis_find_query_key(key, table_name, &type->type, _k, _v, _n, &keyname, &key_found) != 0) {
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ if (key_found) {
|
|
|
+ db_redis_key_add_str(keys, &keyname);
|
|
|
+ (*keys_count)++;
|
|
|
+ LM_DBG("found key '%.*s' for type '%.*s'\n",
|
|
|
+ keyname.len, keyname.s,
|
|
|
+ type_name->len, type_name->s);
|
|
|
+ pkg_free(keyname.s);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return 0;
|
|
|
+
|
|
|
+err:
|
|
|
+ LM_ERR("Failed to get type key\n");
|
|
|
+ db_redis_key_free(keys);
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+static int db_redis_build_query_keys(km_redis_con_t *con, const str *table_name,
|
|
|
+ const db_key_t *_k, const db_val_t *_v, const db_op_t *_op, const int _n,
|
|
|
+ redis_key_t **query_keys, int *query_keys_count, int **manual_keys, int *manual_keys_count,
|
|
|
+ int *do_table_scan) {
|
|
|
+
|
|
|
+ struct str_hash_entry *table_e;
|
|
|
+ redis_table_t *table;
|
|
|
+ redis_type_t *type;
|
|
|
+ redis_key_t *key;
|
|
|
+ str keyname;
|
|
|
+ int key_found;
|
|
|
+ redisReply *reply = NULL;
|
|
|
+ str typename = str_init(REDIS_DIRECT_PREFIX);
|
|
|
+
|
|
|
+ *query_keys = NULL;
|
|
|
+ *query_keys_count = 0;
|
|
|
+ *do_table_scan = 1;
|
|
|
+
|
|
|
+ LM_DBG("build query keys\n");
|
|
|
+
|
|
|
+ table_e = str_hash_get(&con->tables, table_name->s, table_name->len);
|
|
|
+ if (!table_e) {
|
|
|
+ LM_ERR("query to undefined table '%.*s', define in db_redis keys parameter!",
|
|
|
+ table_name->len, table_name->s);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ table = (redis_table_t*)table_e->u.p;
|
|
|
+
|
|
|
+ // check if given keys directly match entry key
|
|
|
+ keyname.s = NULL;
|
|
|
+ keyname.len = 0;
|
|
|
+ key = table->entry_keys;
|
|
|
+
|
|
|
+ if (db_redis_find_query_key(key, table_name, &typename, _k, _v, _n, &keyname, &key_found) != 0) {
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ if (key_found) {
|
|
|
+ LM_DBG("found suitable entry key '%.*s' for query\n",
|
|
|
+ keyname.len, keyname.s);
|
|
|
+ db_redis_key_add_str(query_keys, &keyname);
|
|
|
+ *query_keys_count = 1;
|
|
|
+ pkg_free(keyname.s);
|
|
|
+ keyname.s = NULL;
|
|
|
+ } else {
|
|
|
+ LM_DBG("no direct entry key found, checking type keys\n");
|
|
|
+ for (type = table->types; type; type = type->next) {
|
|
|
+ key = type->keys;
|
|
|
+ LM_DBG("checking type '%.*s'\n", type->type.len, type->type.s);
|
|
|
+ if (db_redis_find_query_key(key, table_name, &type->type, _k, _v, _n, &keyname, &key_found) != 0) {
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ if (key_found) {
|
|
|
+ redis_key_t *query_v = NULL;
|
|
|
+ char *prefix = "SMEMBERS";
|
|
|
+
|
|
|
+ if (db_redis_key_add_string(&query_v, prefix, strlen(prefix)) != 0) {
|
|
|
+ LM_ERR("Failed to add smembers command to query\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_str(&query_v, &keyname) != 0) {
|
|
|
+ LM_ERR("Failed to add key name to smembers query\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+
|
|
|
+ reply = db_redis_command_argv(con, query_v);
|
|
|
+ pkg_free(keyname.s);
|
|
|
+ keyname.s = NULL;
|
|
|
+ db_redis_key_free(&query_v);
|
|
|
+ db_redis_check_reply(con, reply, err);
|
|
|
+ if (reply->type == REDIS_REPLY_ARRAY) {
|
|
|
+ if (reply->elements == 0) {
|
|
|
+ LM_DBG("type query returned empty list\n");
|
|
|
+ *query_keys_count = 0;
|
|
|
+ *do_table_scan = 0;
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ break;
|
|
|
+ } else {
|
|
|
+ LM_DBG("populating query keys list with result of type query\n");
|
|
|
+ *query_keys_count = reply->elements;
|
|
|
+ for (int i = 0; i < reply->elements; ++i) {
|
|
|
+ redisReply *subreply = reply->element[i];
|
|
|
+ if (subreply->type == REDIS_REPLY_STRING) {
|
|
|
+ LM_DBG("adding resulting entry key '%s' from type query\n", subreply->str);
|
|
|
+ if (db_redis_key_add_string(query_keys, subreply->str, strlen(subreply->str)) != 0) {
|
|
|
+ LM_ERR("Failed to add query key\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LM_ERR("Unexpected entry key type in type query, expecting a string\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LM_ERR("Unexpected reply for type query, expecting an array\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (*query_keys_count > 0) {
|
|
|
+ LM_DBG("building manual keys\n");
|
|
|
+ if (db_redis_build_entry_manual_keys(table, _k, _v, _n, manual_keys, manual_keys_count) != 0) {
|
|
|
+ LM_ERR("Failed to build manual entry key list\n");
|
|
|
+ goto err;
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return 0;
|
|
|
+err:
|
|
|
+ if (keyname.s) {
|
|
|
+ pkg_free(keyname.s);
|
|
|
+ keyname.s = NULL;
|
|
|
+ }
|
|
|
+ if (reply) {
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ }
|
|
|
+ db_redis_key_free(query_keys);
|
|
|
+ if (*manual_keys) {
|
|
|
+ pkg_free(*manual_keys);
|
|
|
+ *manual_keys = NULL;
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name,
|
|
|
+ const db_key_t *_k, const int _n,
|
|
|
+ redis_key_t **query_keys, int *query_keys_count,
|
|
|
+ int **manual_keys, int *manual_keys_count) {
|
|
|
+
|
|
|
+ size_t i = 0;
|
|
|
+ redis_key_t *query_v = NULL;
|
|
|
+ char cursor_str[32] = "";
|
|
|
+ redisReply *reply = NULL;
|
|
|
+ unsigned long cursor = 0;
|
|
|
+ char *match = NULL;
|
|
|
+
|
|
|
+ str match_pattern = {":entry::*", strlen(":entry::*")};
|
|
|
+
|
|
|
+ *query_keys = NULL;
|
|
|
+ *query_keys_count = 0;
|
|
|
+ *manual_keys = NULL;
|
|
|
+ *manual_keys_count = 0;
|
|
|
+
|
|
|
+ do {
|
|
|
+ snprintf(cursor_str, sizeof(cursor_str), "%lu", cursor);
|
|
|
+ match = (char*)pkg_malloc(table_name->len + match_pattern.len + 1);
|
|
|
+ if (!match) {
|
|
|
+ LM_ERR("Failed to allocate memory for match pattern\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ snprintf(match, table_name->len + match_pattern.len + 1, "%s%s\n",
|
|
|
+ table_name->s, match_pattern.s);
|
|
|
+
|
|
|
+ if (db_redis_key_add_string(&query_v, "SCAN", 4) != 0) {
|
|
|
+ LM_ERR("Failed to add scan command to scan query\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_string(&query_v, cursor_str, strlen(cursor_str)) != 0) {
|
|
|
+ LM_ERR("Failed to add cursor to scan query\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_string(&query_v, "MATCH", 5) != 0) {
|
|
|
+ LM_ERR("Failed to add match command to scan query\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_string(&query_v, match, strlen(match)) != 0) {
|
|
|
+ LM_ERR("Failed to add match pattern to scan query\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ pkg_free(match); match = NULL;
|
|
|
+
|
|
|
+ reply = db_redis_command_argv(con, query_v);
|
|
|
+ db_redis_key_free(&query_v);
|
|
|
+ db_redis_check_reply(con, reply, err);
|
|
|
+ if (reply->type != REDIS_REPLY_ARRAY) {
|
|
|
+ LM_ERR("Invalid reply type for scan on table '%.*s', expected array\n",
|
|
|
+ table_name->len, table_name->s);
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ if (reply->elements != 2) {
|
|
|
+ LM_ERR("Invalid number of reply elements for scan on table '%.*s', expected 2, got %lu\n",
|
|
|
+ table_name->len, table_name->s, reply->elements);
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (reply->element[0]->type == REDIS_REPLY_STRING) {
|
|
|
+ cursor = atol(reply->element[0]->str);
|
|
|
+ } else if (reply->element[0]->type == REDIS_REPLY_INTEGER) {
|
|
|
+ // should not happen, but play it safe
|
|
|
+ cursor = reply->element[0]->integer;
|
|
|
+ } else {
|
|
|
+ LM_ERR("Invalid cursor type for scan on table '%.*s', expected string or integer\n",
|
|
|
+ table_name->len, table_name->s);
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (reply->element[1]->type != REDIS_REPLY_ARRAY) {
|
|
|
+ LM_ERR("Invalid content type for scan on table '%.*s', expected array\n",
|
|
|
+ table_name->len, table_name->s);
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ if (reply->element[1]->elements == 0) {
|
|
|
+ LM_DBG("no matching entries found for scan on table '%.*s'\n",
|
|
|
+ table_name->len, table_name->s);
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ *query_keys_count += reply->element[1]->elements;
|
|
|
+
|
|
|
+ for (size_t j = 0; j < reply->element[1]->elements; ++i, ++j) {
|
|
|
+ redisReply *key = reply->element[1]->element[j];
|
|
|
+ if (!key) {
|
|
|
+ LM_ERR("Invalid null key at cursor result index %lu while scanning table '%.*s'\n",
|
|
|
+ j, table_name->len, table_name->s);
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ if (key->type != REDIS_REPLY_STRING) {
|
|
|
+ LM_ERR("Invalid key type at cursor result index %lu while scanning table '%.*s', expected string\n",
|
|
|
+ j, table_name->len, table_name->s);
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_string(query_keys, key->str, strlen(key->str)) != 0) {
|
|
|
+ LM_ERR("Failed to add redis key\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ } while (cursor > 0);
|
|
|
+
|
|
|
+ // for full table scans, we have to manually match all given keys
|
|
|
+ *manual_keys_count = _n;
|
|
|
+ *manual_keys = (int*)pkg_malloc(*manual_keys_count * sizeof(int));
|
|
|
+ if (! *manual_keys) {
|
|
|
+ LM_ERR("Failed to allocate memory for manual keys\n");
|
|
|
+ goto err;
|
|
|
+ }
|
|
|
+ memset(*manual_keys, 0, *manual_keys_count * sizeof(int));
|
|
|
+ for (int i = 0; i < _n; ++i) {
|
|
|
+ (*manual_keys)[i] = i;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (reply) {
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+
|
|
|
+err:
|
|
|
+ if (match)
|
|
|
+ pkg_free(match);
|
|
|
+ if (reply)
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ db_redis_key_free(&query_v);
|
|
|
+ db_redis_key_free(query_keys);
|
|
|
+ *query_keys_count = 0;
|
|
|
+ if (*manual_keys) {
|
|
|
+ pkg_free(*manual_keys);
|
|
|
+ *manual_keys = NULL;
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+static int db_redis_compare_column(db_key_t k, db_val_t *v, db_op_t op, redisReply *reply) {
|
|
|
+ int i_value;
|
|
|
+ long long ll_value;
|
|
|
+ double d_value;
|
|
|
+ str *tmpstr;
|
|
|
+ char tmp[32] = "";
|
|
|
+
|
|
|
+ int vtype = VAL_TYPE(v);
|
|
|
+
|
|
|
+ if (VAL_NULL(v) && reply->type == REDIS_REPLY_NIL) {
|
|
|
+ LM_DBG("comparing matching NULL values\n");
|
|
|
+ return 0;
|
|
|
+ } else if (VAL_NULL(v) || reply->type == REDIS_REPLY_NIL) {
|
|
|
+ LM_DBG("comparing non-matching NULL values\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ switch (vtype) {
|
|
|
+ case DB1_INT:
|
|
|
+ i_value = atoi(reply->str);
|
|
|
+ LM_DBG("comparing INT %d %s %d\n", i_value, op, VAL_INT(v));
|
|
|
+ if (!strcmp(op, OP_EQ)) {
|
|
|
+ if (i_value == VAL_INT(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_LT)) {
|
|
|
+ if (i_value < VAL_INT(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_GT)) {
|
|
|
+ if (i_value > VAL_INT(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_LEQ)) {
|
|
|
+ if (i_value <= VAL_INT(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_GEQ)) {
|
|
|
+ if (i_value >= VAL_INT(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_NEQ)) {
|
|
|
+ if (i_value != VAL_INT(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_BITWISE_AND)) {
|
|
|
+ if (i_value & VAL_INT(v))
|
|
|
+ return 0;
|
|
|
+ } else {
|
|
|
+ LM_ERR("Unsupported op type '%s'\n", op);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+ case DB1_BIGINT:
|
|
|
+ ll_value = atoll(reply->str);
|
|
|
+ LM_DBG("comparing BIGINT %lld %s %lld\n", ll_value, op, VAL_BIGINT(v));
|
|
|
+ if (!strcmp(op, OP_EQ)) {
|
|
|
+ if (ll_value == VAL_BIGINT(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_LT)) {
|
|
|
+ if (ll_value < VAL_BIGINT(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_GT)) {
|
|
|
+ if (ll_value > VAL_BIGINT(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_LEQ)) {
|
|
|
+ if (ll_value <= VAL_BIGINT(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_GEQ)) {
|
|
|
+ if (ll_value >= VAL_BIGINT(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_NEQ)) {
|
|
|
+ if (ll_value != VAL_BIGINT(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_BITWISE_AND)) {
|
|
|
+ if (ll_value & VAL_BIGINT(v))
|
|
|
+ return 0;
|
|
|
+ } else {
|
|
|
+ LM_ERR("Unsupported op type '%s'\n", op);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+ case DB1_STRING:
|
|
|
+ LM_DBG("comparing STRING %s %s %s\n", reply->str, op, VAL_STRING(v));
|
|
|
+ if (!strcmp(op, OP_EQ)) {
|
|
|
+ return (strcmp(reply->str, VAL_STRING(v)) == 0) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_LT)) {
|
|
|
+ return (strcmp(reply->str, VAL_STRING(v)) < 0) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_GT)) {
|
|
|
+ return (strcmp(reply->str, VAL_STRING(v)) > 0) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_LEQ)) {
|
|
|
+ return (strcmp(reply->str, VAL_STRING(v)) <= 0) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_GEQ)) {
|
|
|
+ return (strcmp(reply->str, VAL_STRING(v)) >= 0) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_NEQ)) {
|
|
|
+ return (strcmp(reply->str, VAL_STRING(v)) != 0) ? 0 : -1;
|
|
|
+ } else {
|
|
|
+ LM_ERR("Unsupported op type '%s'\n", op);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+ case DB1_STR:
|
|
|
+ tmpstr = (struct _str*) &(VAL_STR(v));
|
|
|
+ LM_DBG("comparing STR %s %s %.*s\n", reply->str, op, tmpstr->len, tmpstr->s);
|
|
|
+ if (!strcmp(op, OP_EQ)) {
|
|
|
+ return (strncmp(reply->str, tmpstr->s, tmpstr->len) == 0) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_LT)) {
|
|
|
+ return (strncmp(reply->str, tmpstr->s, tmpstr->len) < 0) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_GT)) {
|
|
|
+ return (strncmp(reply->str, tmpstr->s, tmpstr->len) > 0) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_LEQ)) {
|
|
|
+ return (strncmp(reply->str, tmpstr->s, tmpstr->len) <= 0) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_GEQ)) {
|
|
|
+ return (strncmp(reply->str, tmpstr->s, tmpstr->len) >= 0) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_NEQ)) {
|
|
|
+ return (strncmp(reply->str, tmpstr->s, tmpstr->len) != 0) ? 0 : -1;
|
|
|
+ } else {
|
|
|
+ LM_ERR("Unsupported op type '%s'\n", op);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+ case DB1_DOUBLE:
|
|
|
+ d_value = atof(reply->str);
|
|
|
+ LM_DBG("comparing DOUBLE %f %s %f\n", d_value, op, VAL_DOUBLE(v));
|
|
|
+ if (!strcmp(op, OP_EQ)) {
|
|
|
+ return (d_value == VAL_DOUBLE(v)) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_LT)) {
|
|
|
+ return (d_value < VAL_DOUBLE(v)) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_GT)) {
|
|
|
+ return (d_value > VAL_DOUBLE(v)) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_LEQ)) {
|
|
|
+ return (d_value <= VAL_DOUBLE(v)) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_GEQ)) {
|
|
|
+ return (d_value >= VAL_DOUBLE(v)) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_NEQ)) {
|
|
|
+ return (d_value != VAL_DOUBLE(v)) ? 0 : -1;
|
|
|
+ } else {
|
|
|
+ LM_ERR("Unsupported op type '%s'\n", op);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+ case DB1_DATETIME:
|
|
|
+ // TODO: insert int value to db for faster comparison!
|
|
|
+ strftime(tmp, sizeof(tmp), "%Y-%m-%d %H:%M:%S", localtime(&(VAL_TIME(v))));
|
|
|
+ LM_DBG("comparing DATETIME %s %s %s\n", reply->str, op, tmp);
|
|
|
+ if (!strcmp(op, OP_EQ)) {
|
|
|
+ return (strcmp(reply->str, tmp) == 0) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_LT)) {
|
|
|
+ return (strcmp(reply->str, tmp) < 0) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_GT)) {
|
|
|
+ return (strcmp(reply->str, tmp) > 0) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_LEQ)) {
|
|
|
+ return (strcmp(reply->str, tmp) <= 0) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_GEQ)) {
|
|
|
+ return (strcmp(reply->str, tmp) >= 0) ? 0 : -1;
|
|
|
+ } else if (!strcmp(op, OP_NEQ)) {
|
|
|
+ return (strcmp(reply->str, tmp) != 0) ? 0 : -1;
|
|
|
+ } else {
|
|
|
+ LM_ERR("Unsupported op type '%s'\n", op);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+ case DB1_BITMAP:
|
|
|
+ i_value = atoi(reply->str);
|
|
|
+ LM_DBG("comparing BITMAP %d %s %d\n", i_value, op, VAL_BITMAP(v));
|
|
|
+ if (!strcmp(op, OP_EQ)) {
|
|
|
+ if (i_value == VAL_BITMAP(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_LT)) {
|
|
|
+ if (i_value < VAL_BITMAP(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_GT)) {
|
|
|
+ if (i_value > VAL_BITMAP(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_LEQ)) {
|
|
|
+ if (i_value <= VAL_BITMAP(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_GEQ)) {
|
|
|
+ if (i_value >= VAL_BITMAP(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_NEQ)) {
|
|
|
+ if (i_value != VAL_BITMAP(v))
|
|
|
+ return 0;
|
|
|
+ } else if (!strcmp(op, OP_BITWISE_AND)) {
|
|
|
+ if (i_value & VAL_BITMAP(v))
|
|
|
+ return 0;
|
|
|
+ } else {
|
|
|
+ LM_ERR("Unsupported op type '%s'\n", op);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+ case DB1_BLOB:
|
|
|
+ default:
|
|
|
+ LM_ERR("Unsupported val type %d\n", vtype);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+static int db_redis_convert_row(km_redis_con_t *con, db1_res_t* _r, const db_key_t *_k,
|
|
|
+ const db_val_t *_v, const db_op_t *_op,
|
|
|
+ redisReply *reply, const str *table_name, const db_key_t* _c, int _nc,
|
|
|
+ int *manual_keys, int manual_keys_count) {
|
|
|
+ db_val_t* dval;
|
|
|
+ db_row_t* drow;
|
|
|
+
|
|
|
+ if (reply->type != REDIS_REPLY_ARRAY) {
|
|
|
+ LM_ERR("Unexpected redis reply type, expecting array\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!reply->elements) {
|
|
|
+ LM_DBG("skip empty row");
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ // manually filter non-matching replies
|
|
|
+ for (size_t col = 0; col < reply->elements; ++col) {
|
|
|
+ if (col < manual_keys_count) {
|
|
|
+ int idx = manual_keys[col];
|
|
|
+ db_key_t k = _k[idx];
|
|
|
+ db_val_t v = _v[idx];
|
|
|
+ db_op_t o = _op[idx];
|
|
|
+ LM_DBG("manually filtering key '%.*s'\n",
|
|
|
+ k->len, k->s);
|
|
|
+ if (db_redis_compare_column(k, &v, o, reply->element[col]) != 0) {
|
|
|
+ LM_DBG("column %lu does not match, ignore row\n", col);
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ RES_NUM_ROWS(_r) = RES_ROW_N(_r) = RES_NUM_ROWS(_r) + 1;
|
|
|
+ drow = &(RES_ROWS(_r)[RES_NUM_ROWS(_r)-1]);
|
|
|
+
|
|
|
+ if (db_allocate_row(_r, drow) != 0) {
|
|
|
+ LM_ERR("Failed to allocate row %d\n", RES_NUM_ROWS(_r));
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (reply->elements - manual_keys_count > RES_COL_N(_r)) {
|
|
|
+ LM_ERR("Invalid number of columns at row %d/%d, expecting %d, got %lu\n",
|
|
|
+ RES_NUM_ROWS(_r), RES_ROW_N(_r), RES_COL_N(_r), reply->elements - manual_keys_count);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ for (size_t col = manual_keys_count; col < reply->elements; ++col) {
|
|
|
+ size_t colidx = col - manual_keys_count;
|
|
|
+ size_t redisidx = col;
|
|
|
+ int coltype;
|
|
|
+ redisReply *col_val = reply->element[redisidx];
|
|
|
+ str *col_name = _c[colidx];
|
|
|
+
|
|
|
+ LM_DBG("converting column #%lu of row #%d", colidx, RES_ROW_N(_r));
|
|
|
+
|
|
|
+ if (col_val->type != REDIS_REPLY_STRING &&
|
|
|
+ col_val->type != REDIS_REPLY_NIL) {
|
|
|
+
|
|
|
+ LM_ERR("Invalid column value type in column '%.*s' of row %d, expecting string or null\n",
|
|
|
+ col_name->len, col_name->s, RES_NUM_ROWS(_r));
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (RES_NUM_ROWS(_r) == 1) {
|
|
|
+ coltype = db_redis_schema_get_column_type(con, table_name, col_name);
|
|
|
+ RES_TYPES(_r)[colidx] = coltype;
|
|
|
+ } else {
|
|
|
+ coltype = RES_TYPES(_r)[colidx];
|
|
|
+ }
|
|
|
+
|
|
|
+ dval = &(ROW_VALUES(drow)[colidx]);
|
|
|
+ VAL_TYPE(dval) = coltype;
|
|
|
+
|
|
|
+ if (col_val->type == REDIS_REPLY_NIL) {
|
|
|
+ VAL_NULL(dval) = 1;
|
|
|
+ } else {
|
|
|
+ if (db_str2val(coltype, dval, col_val->str, strlen(col_val->str), 1) != 0) {
|
|
|
+ LM_ERR("Failed to convert redis column '%.*s' to db value\n",
|
|
|
+ col_name->len, col_name->s);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
+static int db_redis_perform_query(const db1_con_t* _h, km_redis_con_t *con, const db_key_t* _k,
|
|
|
+ const db_val_t* _v, const db_op_t *_op, const db_key_t* _c,
|
|
|
+ const int _n, const int _nc, db1_res_t** _r,
|
|
|
+ redis_key_t **keys, int *keys_count,
|
|
|
+ int **manual_keys, int *manual_keys_count, int do_table_scan) {
|
|
|
+
|
|
|
+ redisReply *reply = NULL;
|
|
|
+ redis_key_t *query_v = NULL;
|
|
|
+ int num_rows = 0;
|
|
|
+
|
|
|
+ *_r = db_redis_new_result();
|
|
|
+ if (!*_r) {
|
|
|
+ LM_ERR("Failed to allocate memory for result");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (db_allocate_columns(*_r, _nc) != 0) {
|
|
|
+ LM_ERR("Failed to allocate memory for result columns");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = 0;
|
|
|
+ RES_COL_N(*_r) = _nc;
|
|
|
+
|
|
|
+ if (!keys_count && do_table_scan) {
|
|
|
+ LM_DBG("performing full table scan\n");
|
|
|
+ if (db_redis_scan_query_keys(con, CON_TABLE(_h), _k, _n,
|
|
|
+ keys, keys_count,
|
|
|
+ manual_keys, manual_keys_count) != 0) {
|
|
|
+ LM_ERR("failed to scan query keys\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for (redis_key_t *key = *keys; key; key = key->next) {
|
|
|
+ redis_key_t *tmp = NULL;
|
|
|
+ str *keyname = &(key->key);
|
|
|
+
|
|
|
+ num_rows++;
|
|
|
+
|
|
|
+ LM_DBG("checking key '%s' in redis\n", keyname->s);
|
|
|
+
|
|
|
+ if (db_redis_key_add_string(&query_v, "EXISTS", 6) != 0) {
|
|
|
+ LM_ERR("Failed to add exists query to list\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_str(&query_v, keyname) != 0) {
|
|
|
+ LM_ERR("Failed to add key name to list\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_append_command_argv(con, query_v) != REDIS_OK) {
|
|
|
+ LM_ERR("Failed to append redis command\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ tmp = db_redis_key_unshift(&query_v);
|
|
|
+ if (tmp)
|
|
|
+ db_redis_key_free(&tmp);
|
|
|
+
|
|
|
+ // construct HMGET query
|
|
|
+ if (_nc + (*manual_keys_count) == 0) {
|
|
|
+ if (db_redis_key_prepend_string(&query_v, "HGETALL", 7) != 0) {
|
|
|
+ LM_ERR("Failed to add hgetall query to list\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (db_redis_key_prepend_string(&query_v, "HMGET", 5) != 0) {
|
|
|
+ LM_ERR("Failed to add hmget query to list\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // we put the manual comparison columns first, so we can skip them
|
|
|
+ // easily in result, for the cost of potential duplicate column returns
|
|
|
+ for (int j = 0; j < *manual_keys_count; ++j) {
|
|
|
+ int idx = (*manual_keys)[j];
|
|
|
+ str *k_name = _k[idx];
|
|
|
+ if (db_redis_key_add_str(&query_v, k_name) != 0) {
|
|
|
+ LM_ERR("Failed to add manual key to query list\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for (int j = 0; j < _nc; ++j) {
|
|
|
+ str *k_name = _c[j];
|
|
|
+ if (db_redis_key_add_str(&query_v, k_name) != 0) {
|
|
|
+ LM_ERR("Failed to add manual key to query list\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (db_redis_append_command_argv(con, query_v) != REDIS_OK) {
|
|
|
+ LM_ERR("Failed to append redis command\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ db_redis_key_free(&query_v);
|
|
|
+ query_v = NULL;
|
|
|
+ }
|
|
|
+
|
|
|
+ // we allocate best case scenario (all rows match)
|
|
|
+ RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = num_rows;
|
|
|
+ if (db_allocate_rows(*_r) != 0) {
|
|
|
+ LM_ERR("Failed to allocate memory for rows\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ RES_COL_N(*_r) = _nc;
|
|
|
+ // reset and increment in convert_row
|
|
|
+ RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = 0;
|
|
|
+
|
|
|
+ for (redis_key_t *key = *keys; key; key = key->next) {
|
|
|
+ // get reply for EXISTS query
|
|
|
+ if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
|
|
|
+ LM_ERR("Failed to get reply for query: %s\n",
|
|
|
+ con->con->errstr);
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ db_redis_check_reply(con, reply, error);
|
|
|
+ if (reply->integer == 0) {
|
|
|
+ LM_DBG("key does not exist, returning no row for query\n");
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ // also free next reply, as this is a null row for the HMGET
|
|
|
+ db_redis_get_reply(con, (void**)&reply);
|
|
|
+ db_redis_check_reply(con, reply, error);
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+
|
|
|
+ // get reply for actual HMGET query
|
|
|
+ if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
|
|
|
+ LM_ERR("Failed to get reply for query: %s\n",
|
|
|
+ con->con->errstr);
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ db_redis_check_reply(con, reply, error);
|
|
|
+ if (reply->type != REDIS_REPLY_ARRAY) {
|
|
|
+ LM_ERR("Unexpected reply, expected array\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ LM_DBG("dumping full query reply for row\n");
|
|
|
+ db_redis_dump_reply(reply);
|
|
|
+
|
|
|
+ if (db_redis_convert_row(con, *_r, _k, _v, _op, reply, CON_TABLE(_h), _c, _nc, *manual_keys, *manual_keys_count)) {
|
|
|
+ LM_ERR("Failed to convert redis reply for row\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ }
|
|
|
+
|
|
|
+ return 0;
|
|
|
+
|
|
|
+error:
|
|
|
+ LM_ERR("failed to perform the query\n");
|
|
|
+ db_redis_key_free(&query_v);
|
|
|
+ if(reply)
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ if(_r && *_r) {
|
|
|
+ db_redis_free_result((db1_con_t*)_h, *_r); *_r = NULL;
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, const db_key_t* _k,
|
|
|
+ const db_val_t* _v, const db_op_t *_op, const int _n,
|
|
|
+ redis_key_t *keys, int keys_count,
|
|
|
+ int *manual_keys, int manual_keys_count, int do_table_scan) {
|
|
|
+
|
|
|
+ int j = 0;
|
|
|
+ redis_key_t *k = NULL;
|
|
|
+ int type_keys_count = 0;
|
|
|
+ int all_type_keys_count = 0;
|
|
|
+
|
|
|
+ redisReply *reply = NULL;
|
|
|
+ redis_key_t *query_v = NULL;
|
|
|
+ redis_key_t *type_keys = NULL;
|
|
|
+ redis_key_t *all_type_keys = NULL;
|
|
|
+ db_val_t *db_vals = NULL;
|
|
|
+ db_key_t *db_keys = NULL;
|
|
|
+
|
|
|
+ if (!keys_count && do_table_scan) {
|
|
|
+ LM_DBG("performing full table scan\n");
|
|
|
+ if (db_redis_scan_query_keys(con, CON_TABLE(_h), _k, _n,
|
|
|
+ &keys, &keys_count,
|
|
|
+ &manual_keys, &manual_keys_count) != 0) {
|
|
|
+ LM_ERR("failed to scan query keys\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO: this should be moved to redis_connection structure
|
|
|
+ // and be parsed at startup:
|
|
|
+ //
|
|
|
+ // fetch list of keys in all types
|
|
|
+ if (db_redis_get_keys_for_all_types(con, CON_TABLE(_h),
|
|
|
+ &all_type_keys, &all_type_keys_count) != 0) {
|
|
|
+ LM_ERR("failed to get full list of type keys\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ LM_DBG("+++ delete all keys\n");
|
|
|
+ for (k = keys; k; k = k->next) {
|
|
|
+ redis_key_t *all_type_key;
|
|
|
+ str *key = &k->key;
|
|
|
+ redis_key_t *tmp = NULL;
|
|
|
+ int row_match;
|
|
|
+ LM_DBG("+++ delete key '%.*s'\n", key->len, key->s);
|
|
|
+
|
|
|
+ if (db_redis_key_add_string(&query_v, "EXISTS", 6) != 0) {
|
|
|
+ LM_ERR("Failed to add exists command to pre-delete query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_str(&query_v, key) != 0) {
|
|
|
+ LM_ERR("Failed to add key name to pre-delete query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO: pipeline commands!
|
|
|
+ reply = db_redis_command_argv(con, query_v);
|
|
|
+ db_redis_check_reply(con, reply, error);
|
|
|
+ if (reply->integer == 0) {
|
|
|
+ LM_DBG("key does not exist in redis, skip deleting\n");
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ tmp = db_redis_key_unshift(&query_v);
|
|
|
+ if (tmp)
|
|
|
+ db_redis_key_free(&tmp);
|
|
|
+
|
|
|
+ if (db_redis_key_prepend_string(&query_v, "HMGET", 5) != 0) {
|
|
|
+ LM_ERR("Failed to set hmget command to pre-delete query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ // add all manual keys to query
|
|
|
+ for (j = 0; j < manual_keys_count; ++j) {
|
|
|
+ int idx = manual_keys[j];
|
|
|
+ str *col = _k[idx];
|
|
|
+
|
|
|
+ if (db_redis_key_add_str(&query_v, col) != 0) {
|
|
|
+ LM_ERR("Failed to add manual key to pre-delete query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // add all type keys to query
|
|
|
+ for (all_type_key = all_type_keys; all_type_key; all_type_key = all_type_key->next) {
|
|
|
+ if (db_redis_key_add_str(&query_v, &all_type_key->key) != 0) {
|
|
|
+ LM_ERR("Failed to add type key to pre-delete query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ reply = db_redis_command_argv(con, query_v);
|
|
|
+ db_redis_key_free(&query_v);
|
|
|
+ db_redis_check_reply(con, reply, error);
|
|
|
+
|
|
|
+ LM_DBG("dumping full query reply\n");
|
|
|
+ db_redis_dump_reply(reply);
|
|
|
+
|
|
|
+ // manually filter non-matching replies
|
|
|
+ row_match = 1;
|
|
|
+ for (size_t col = 0; col < reply->elements; ++col) {
|
|
|
+ if (col < manual_keys_count) {
|
|
|
+ int idx = manual_keys[col];
|
|
|
+ db_key_t k = _k[idx];
|
|
|
+ db_val_t v = _v[idx];
|
|
|
+ db_op_t o = _op[idx];
|
|
|
+ LM_DBG("manually filtering key '%.*s'\n",
|
|
|
+ k->len, k->s);
|
|
|
+ if (db_redis_compare_column(k, &v, o, reply->element[col]) != 0) {
|
|
|
+ LM_DBG("column %lu does not match, ignore row\n", col);
|
|
|
+ row_match = 0;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!row_match) {
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ continue;
|
|
|
+ } else {
|
|
|
+ LM_DBG("row matches manual filtering, proceed with deletion\n");
|
|
|
+ }
|
|
|
+
|
|
|
+ db_keys = (db_key_t*) pkg_malloc(all_type_keys_count * sizeof(db_key_t));
|
|
|
+ if (!db_keys) {
|
|
|
+ LM_ERR("Failed to allocate memory for db type keys\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ for (j = 0, tmp = all_type_keys; tmp; ++j, tmp = tmp->next) {
|
|
|
+ db_keys[j] = &tmp->key;
|
|
|
+ }
|
|
|
+
|
|
|
+ db_vals = (db_val_t*) pkg_malloc(all_type_keys_count * sizeof(db_val_t));
|
|
|
+ if (!db_vals) {
|
|
|
+ LM_ERR("Failed to allocate memory for manual db vals\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ for (j = 0, all_type_key = all_type_keys; all_type_key; ++j, all_type_key = all_type_key->next) {
|
|
|
+ db_val_t *v = &(db_vals[j]);
|
|
|
+ str *key = &all_type_key->key;
|
|
|
+ char *value = reply->element[manual_keys_count + j]->str;
|
|
|
+ int coltype = db_redis_schema_get_column_type(con, CON_TABLE(_h), key);
|
|
|
+ if (value == NULL) {
|
|
|
+ VAL_NULL(v) = 1;
|
|
|
+ } else if (db_str2val(coltype, v, value, strlen(value), 0) != 0) {
|
|
|
+ LM_ERR("Failed to convert redis reply column to db value\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (db_redis_build_type_keys(con, CON_TABLE(_h), db_keys, db_vals, all_type_keys_count,
|
|
|
+ &type_keys, &type_keys_count) != 0) {
|
|
|
+ LM_ERR("failed to build type keys\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ pkg_free(db_keys);
|
|
|
+ pkg_free(db_vals);
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+
|
|
|
+ if (db_redis_key_add_string(&query_v, "DEL", 3) != 0) {
|
|
|
+ LM_ERR("Failed to add del command to delete query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_str(&query_v, key) != 0) {
|
|
|
+ LM_ERR("Failed to add key to delete query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ reply = db_redis_command_argv(con, query_v);
|
|
|
+ db_redis_key_free(&query_v);
|
|
|
+ db_redis_check_reply(con, reply, error);
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+
|
|
|
+ for (redis_key_t *type_key = type_keys; type_key; type_key = type_key->next) {
|
|
|
+ if (db_redis_key_add_string(&query_v, "SREM", 4) != 0) {
|
|
|
+ LM_ERR("Failed to add srem command to post-delete query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_str(&query_v, &type_key->key) != 0) {
|
|
|
+ LM_ERR("Failed to add key to delete query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_str(&query_v, key) != 0) {
|
|
|
+ LM_ERR("Failed to add key to delete query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ reply = db_redis_command_argv(con, query_v);
|
|
|
+ db_redis_key_free(&query_v);
|
|
|
+ db_redis_check_reply(con, reply, error);
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ }
|
|
|
+
|
|
|
+ //db_redis_key_free(&type_keys);
|
|
|
+ LM_DBG("+++ done with loop '%.*s'\n", k->key.len, k->key.s);
|
|
|
+ }
|
|
|
+ pkg_free(query_v);
|
|
|
+ db_redis_key_free(&type_keys);
|
|
|
+ db_redis_key_free(&all_type_keys);
|
|
|
+
|
|
|
+ return 0;
|
|
|
+
|
|
|
+error:
|
|
|
+ LM_ERR("failed to perform the delete\n");
|
|
|
+ if(reply)
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ if (db_keys)
|
|
|
+ pkg_free(db_keys);
|
|
|
+ if (db_vals)
|
|
|
+ pkg_free(db_vals);
|
|
|
+ db_redis_key_free(&query_v);
|
|
|
+ db_redis_key_free(&type_keys);
|
|
|
+ db_redis_key_free(&all_type_keys);
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, const db_key_t* _k,
|
|
|
+ const db_val_t* _v, const db_op_t *_op, const db_key_t* _uk, const db_val_t *_uv,
|
|
|
+ const int _n, const int _nu,
|
|
|
+ redis_key_t **keys, int *keys_count,
|
|
|
+ int **manual_keys, int *manual_keys_count, int do_table_scan) {
|
|
|
+
|
|
|
+ redisReply *reply = NULL;
|
|
|
+ redis_key_t *query_v = NULL;
|
|
|
+ int update_queries = 0;
|
|
|
+
|
|
|
+ if (!keys_count && do_table_scan) {
|
|
|
+ LM_DBG("performing full table scan\n");
|
|
|
+ if (db_redis_scan_query_keys(con, CON_TABLE(_h), _k, _n,
|
|
|
+ keys, keys_count,
|
|
|
+ manual_keys, manual_keys_count) != 0) {
|
|
|
+ LM_ERR("failed to scan query keys\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for (redis_key_t *key = *keys; key; key = key->next) {
|
|
|
+ str *keyname = &key->key;
|
|
|
+
|
|
|
+ LM_DBG("fetching row for '%.*s' from redis\n", keyname->len, keyname->s);
|
|
|
+
|
|
|
+
|
|
|
+ if (db_redis_key_add_string(&query_v, "EXISTS", 6) != 0) {
|
|
|
+ LM_ERR("Failed to set exists command to pre-update exists query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_str(&query_v, keyname) != 0) {
|
|
|
+ LM_ERR("Failed to add key name to pre-update exists query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_append_command_argv(con, query_v) != REDIS_OK) {
|
|
|
+ LM_ERR("Failed to append redis command\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ db_redis_key_free(&query_v);
|
|
|
+
|
|
|
+ // construct HMGET query
|
|
|
+ if ((*manual_keys_count) == 0) {
|
|
|
+ if (db_redis_key_add_string(&query_v, "HGETALL", 7) != 0) {
|
|
|
+ LM_ERR("Failed to set hgetall command to pre-update hget query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ // TODO: actually we wouldn't have to fetch it at all, but then we'd
|
|
|
+ // have to mark this key telling to not fetch reply of HMGET after
|
|
|
+ // EXISTS returns false!
|
|
|
+ } else {
|
|
|
+ if (db_redis_key_add_string(&query_v, "HMGET", 5) != 0) {
|
|
|
+ LM_ERR("Failed to set hgetall command to pre-update hget query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_str(&query_v, keyname) != 0) {
|
|
|
+ LM_ERR("Failed to add key name to pre-update exists query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ for (int j = 0; j < *manual_keys_count; ++j) {
|
|
|
+ int idx = (*manual_keys)[j];
|
|
|
+ str *k_name = _k[idx];
|
|
|
+ if (db_redis_key_add_str(&query_v, k_name) != 0) {
|
|
|
+ LM_ERR("Failed to add manual key name to hget query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (db_redis_append_command_argv(con, query_v) != REDIS_OK) {
|
|
|
+ LM_ERR("Failed to append redis command\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ db_redis_key_free(&query_v);
|
|
|
+ }
|
|
|
+
|
|
|
+/*
|
|
|
+ key_value = (str*)pkg_malloc(_nu * sizeof(str));
|
|
|
+ if (!key_value) {
|
|
|
+ LM_ERR("Failed to allocate memory for key buffer\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ memset(key_value, 0, _nu * sizeof(str));
|
|
|
+
|
|
|
+ col_value = (str*)pkg_malloc(_nu * sizeof(str));
|
|
|
+ if (!col_value) {
|
|
|
+ LM_ERR("Failed to allocate memory for column buffer\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ memset(col_value, 0, _nu * sizeof(str));
|
|
|
+ */
|
|
|
+
|
|
|
+
|
|
|
+ for (redis_key_t *key = *keys; key; key = key->next) {
|
|
|
+ int row_match;
|
|
|
+
|
|
|
+ LM_DBG("fetching replies for '%.*s' from redis\n", key->key.len, key->key.s);
|
|
|
+
|
|
|
+ // get reply for EXISTS query
|
|
|
+ if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
|
|
|
+ LM_ERR("Failed to get reply for query: %s\n",
|
|
|
+ con->con->errstr);
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ db_redis_check_reply(con, reply, error);
|
|
|
+ if (reply->integer == 0) {
|
|
|
+ LM_DBG("key does not exist, returning no row for query\n");
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ // also free next reply, as this is a null row for the HMGET
|
|
|
+ LM_DBG("also fetch hmget reply after non-existent key\n");
|
|
|
+ db_redis_get_reply(con, (void**)&reply);
|
|
|
+ db_redis_check_reply(con, reply, error);
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ LM_DBG("continue fetch reply loop\n");
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+
|
|
|
+ // get reply for actual HMGET query
|
|
|
+ if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
|
|
|
+ LM_ERR("Failed to get reply for query: %s\n",
|
|
|
+ con->con->errstr);
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ db_redis_check_reply(con, reply, error);
|
|
|
+ if (reply->type != REDIS_REPLY_ARRAY) {
|
|
|
+ LM_ERR("Unexpected reply, expected array\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ LM_DBG("dumping full query reply for row\n");
|
|
|
+ db_redis_dump_reply(reply);
|
|
|
+
|
|
|
+ // manually filter non-matching replies
|
|
|
+ row_match = 1;
|
|
|
+ for (size_t col = 0; col < reply->elements; ++col) {
|
|
|
+ if (col < *manual_keys_count) {
|
|
|
+ int idx = (*manual_keys)[col];
|
|
|
+ db_key_t k = _k[idx];
|
|
|
+ db_val_t v = _v[idx];
|
|
|
+ db_op_t o = _op[idx];
|
|
|
+ LM_DBG("manually filtering key '%.*s'\n",
|
|
|
+ k->len, k->s);
|
|
|
+ if (db_redis_compare_column(k, &v, o, reply->element[col]) != 0) {
|
|
|
+ LM_DBG("column %lu does not match, ignore row\n", col);
|
|
|
+ row_match = 0;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ if (!row_match) {
|
|
|
+ continue;
|
|
|
+ } else {
|
|
|
+ LM_DBG("row matches manual filtering, proceed with update\n");
|
|
|
+ }
|
|
|
+
|
|
|
+ if (db_redis_key_add_string(&query_v, "HMSET", 5) != 0) {
|
|
|
+ LM_ERR("Failed to add hmset command to update query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_str(&query_v, &key->key) != 0) {
|
|
|
+ LM_ERR("Failed to add key to update query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ for (int i = 0; i < _nu; ++i) {
|
|
|
+ str *k = _uk[i];
|
|
|
+ str v = {NULL, 0};
|
|
|
+
|
|
|
+ if (db_redis_val2str(&(_uv[i]), &v) != 0) {
|
|
|
+ LM_ERR("Failed to convert update value to string\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_str(&query_v, k) != 0) {
|
|
|
+ LM_ERR("Failed to add key to update query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_str(&query_v, &v) != 0) {
|
|
|
+ LM_ERR("Failed to add key to update query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ pkg_free(v.s);
|
|
|
+ }
|
|
|
+ update_queries++;
|
|
|
+ if (db_redis_append_command_argv(con, query_v) != REDIS_OK) {
|
|
|
+ LM_ERR("Failed to append redis command\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ db_redis_key_free(&query_v);
|
|
|
+ }
|
|
|
+
|
|
|
+ LM_DBG("getting replies for %d queries\n", update_queries);
|
|
|
+
|
|
|
+ for (int i = 0; i < update_queries; ++i) {
|
|
|
+ if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
|
|
|
+ LM_ERR("Failed to get reply for query: %s\n",
|
|
|
+ con->con->errstr);
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ db_redis_check_reply(con, reply, error);
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ }
|
|
|
+
|
|
|
+ LM_DBG("done performing update\n");
|
|
|
+
|
|
|
+ return 0;
|
|
|
+
|
|
|
+error:
|
|
|
+ LM_ERR("failed to perform the update\n");
|
|
|
+ if (reply)
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ db_redis_key_free(&query_v);
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+/*
|
|
|
+ * Query table for specified rows
|
|
|
+ * _h: structure representing database connection
|
|
|
+ * _k: key names
|
|
|
+ * _op: operators
|
|
|
+ * _v: values of the keys that must match
|
|
|
+ * _c: column names to return
|
|
|
+ * _n: number of key=values pairs to compare
|
|
|
+ * _nc: number of columns to return
|
|
|
+ * _o: order by the specified column
|
|
|
+ */
|
|
|
+int db_redis_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
|
|
|
+ const db_val_t* _v, const db_key_t* _c, const int _n, const int _nc,
|
|
|
+ const db_key_t _o, db1_res_t** _r)
|
|
|
+{
|
|
|
+ km_redis_con_t *con = NULL;
|
|
|
+ int free_op = 0;
|
|
|
+ int do_table_scan = 0;
|
|
|
+
|
|
|
+ redis_key_t *keys = NULL;
|
|
|
+ int keys_count = 0;
|
|
|
+ int *manual_keys = NULL;
|
|
|
+ int manual_keys_count = 0;
|
|
|
+ db_op_t *query_ops = NULL;
|
|
|
+
|
|
|
+ // TODO: implement order-by
|
|
|
+ // TODO: optimize mapping-based manual post-check (remove check for keys already
|
|
|
+ // in type query key)
|
|
|
+
|
|
|
+ con = REDIS_CON(_h);
|
|
|
+ if (con && con->con == NULL) {
|
|
|
+ if (db_redis_connect(con) != 0) {
|
|
|
+ LM_ERR("Failed to reconnect to server\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if(con == NULL || con->id == NULL || con->con == NULL) {
|
|
|
+ LM_ERR("connection to server is null\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ if(CON_TABLE(_h)->s == NULL) {
|
|
|
+ LM_ERR("prefix (table) name not set\n");
|
|
|
+ return -1;
|
|
|
+ } else {
|
|
|
+ LM_DBG("querying prefix (table) '%.*s'\n",
|
|
|
+ CON_TABLE(_h)->len, CON_TABLE(_h)->s);
|
|
|
+ }
|
|
|
+
|
|
|
+ if(_r) *_r = NULL;
|
|
|
+
|
|
|
+ // check if we have a version query, and return version directly from
|
|
|
+ // schema instead of loading it from redis
|
|
|
+ if (_nc == 1 && _n >= 1 && VAL_TYPE(&_v[0]) == DB1_STR &&
|
|
|
+ CON_TABLE(_h)->len == strlen("version") && strncmp(CON_TABLE(_h)->s, "version", CON_TABLE(_h)->len) == 0 &&
|
|
|
+ _k[0]->len == strlen("table_name") && strncmp(_k[0]->s, "table_name", _k[0]->len) == 0 &&
|
|
|
+ _c[0]->len == strlen("table_version") && strncmp(_c[0]->s, "table_version", _c[0]->len) == 0) {
|
|
|
+
|
|
|
+ if (db_redis_return_version(_h, con, &VAL_STR(&_v[0]), _r) == 0) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ // if we fail to return a version from the schema, go query the table, just in case
|
|
|
+ }
|
|
|
+
|
|
|
+ free_op = 0;
|
|
|
+
|
|
|
+ if (_op == NULL) {
|
|
|
+ char *op = "=";
|
|
|
+ free_op = 1;
|
|
|
+ query_ops = (db_op_t*)pkg_malloc(_n * sizeof(db_op_t));
|
|
|
+ if (!query_ops) {
|
|
|
+ LM_ERR("Failed to allocate memory for query op list\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ for (int i = 0; i < _n; ++i) {
|
|
|
+ query_ops[i] = op;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ query_ops = (db_op_t*)_op;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (_n > 0) {
|
|
|
+ if (db_redis_build_query_keys(con, CON_TABLE(_h), _k, _v, query_ops, _n,
|
|
|
+ &keys, &keys_count, &manual_keys, &manual_keys_count, &do_table_scan) != 0) {
|
|
|
+ LM_ERR("failed to build query keys\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (!keys_count) {
|
|
|
+ if (do_table_scan) {
|
|
|
+ LM_DBG("unable to build query keys, falling back to full table scan\n");
|
|
|
+ } else {
|
|
|
+ LM_DBG("query keys not member of suitable mapping, skip full table scan\n");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LM_DBG("no columns given to build query keys, falling back to full table scan\n");
|
|
|
+ keys_count = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (db_redis_perform_query(_h, con, _k, _v, query_ops, _c, _n, _nc, _r,
|
|
|
+ &keys, &keys_count, &manual_keys, &manual_keys_count, do_table_scan) != 0) {
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ LM_DBG("done performing query\n");
|
|
|
+
|
|
|
+ if (free_op && query_ops) {
|
|
|
+ pkg_free(query_ops);
|
|
|
+ }
|
|
|
+ db_redis_key_free(&keys);
|
|
|
+
|
|
|
+ if (manual_keys) {
|
|
|
+ pkg_free(manual_keys);
|
|
|
+ }
|
|
|
+
|
|
|
+ db_redis_consume_replies(con);
|
|
|
+ return 0;
|
|
|
+
|
|
|
+error:
|
|
|
+ LM_ERR("failed to do the query\n");
|
|
|
+ if (free_op && query_ops) {
|
|
|
+ pkg_free(query_ops);
|
|
|
+ }
|
|
|
+ db_redis_key_free(&keys);
|
|
|
+ if (manual_keys) {
|
|
|
+ pkg_free(manual_keys);
|
|
|
+ }
|
|
|
+ db_redis_consume_replies(con);
|
|
|
+
|
|
|
+
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ * Execute a raw SQL query
|
|
|
+ */
|
|
|
+int db_redis_raw_query(const db1_con_t* _h, const str* _s, db1_res_t** _r)
|
|
|
+{
|
|
|
+ LM_DBG("perform redis raw query\n");
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ * Insert a row into specified table
|
|
|
+ * _h: structure representing database connection
|
|
|
+ * _k: key names
|
|
|
+ * _v: values of the keys
|
|
|
+ * _n: number of key=value pairs
|
|
|
+ */
|
|
|
+int db_redis_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v, const int _n)
|
|
|
+{
|
|
|
+ km_redis_con_t *con = NULL;
|
|
|
+ redis_key_t *key = NULL;
|
|
|
+ int keys_count = 0;
|
|
|
+ redis_key_t *type_keys = NULL;
|
|
|
+ int type_keys_count = 0;
|
|
|
+ redis_key_t *query_v = NULL;
|
|
|
+ redisReply *reply = NULL;
|
|
|
+
|
|
|
+ con = REDIS_CON(_h);
|
|
|
+ if (con && con->con == NULL) {
|
|
|
+ if (db_redis_connect(con) != 0) {
|
|
|
+ LM_ERR("Failed to reconnect to server\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if(con == NULL || con->id == NULL || con->con == NULL) {
|
|
|
+ LM_ERR("connection to server is null\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ if(CON_TABLE(_h)->s == NULL) {
|
|
|
+ LM_ERR("prefix (table) name not set\n");
|
|
|
+ return -1;
|
|
|
+ } else {
|
|
|
+ LM_DBG("inserting to prefix (table) '%.*s'\n",
|
|
|
+ CON_TABLE(_h)->len, CON_TABLE(_h)->s);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (db_redis_build_entry_keys(con, CON_TABLE(_h), _k, _v, _n,
|
|
|
+ &key, &keys_count) != 0) {
|
|
|
+ LM_ERR("failed to build entry keys\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_build_type_keys(con, CON_TABLE(_h), _k, _v, _n,
|
|
|
+ &type_keys, &type_keys_count) != 0) {
|
|
|
+ LM_ERR("failed to build type keys\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (db_redis_key_add_string(&query_v, "HMSET", 5) != 0) {
|
|
|
+ LM_ERR("Failed to add hmset command to insert query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_str(&query_v, &key->key) != 0) {
|
|
|
+ LM_ERR("Failed to add key to insert query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ for (int i = 0; i < _n; ++i) {
|
|
|
+ str *k = _k[i];
|
|
|
+ str v;
|
|
|
+
|
|
|
+ if (db_redis_key_add_str(&query_v, k) != 0) {
|
|
|
+ LM_ERR("Failed to add column name to insert query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_val2str(&(_v[i]), &v) != 0) {
|
|
|
+ LM_ERR("Failed to allocate memory for query value\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_str(&query_v, &v) != 0) {
|
|
|
+ LM_ERR("Failed to add column value to insert query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ pkg_free(v.s);
|
|
|
+ }
|
|
|
+
|
|
|
+ reply = db_redis_command_argv(con, query_v);
|
|
|
+ db_redis_key_free(&query_v);
|
|
|
+ db_redis_check_reply(con, reply, error);
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+
|
|
|
+ for (redis_key_t *k = type_keys; k; k = k->next) {
|
|
|
+ str *type_key = &k->key;
|
|
|
+
|
|
|
+ LM_DBG("inserting entry key '%.*s' to type map '%.*s'\n",
|
|
|
+ key->key.len, key->key.s, type_key->len, type_key->s);
|
|
|
+
|
|
|
+ if (db_redis_key_add_string(&query_v, "SADD", 4) != 0) {
|
|
|
+ LM_ERR("Failed to set sadd command to post-insert query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_str(&query_v, type_key) != 0) {
|
|
|
+ LM_ERR("Failed to add map key to post-insert query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (db_redis_key_add_str(&query_v, &key->key) != 0) {
|
|
|
+ LM_ERR("Failed to set entry key to post-insert query\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ reply = db_redis_command_argv(con, query_v);
|
|
|
+ db_redis_key_free(&query_v);
|
|
|
+ db_redis_check_reply(con, reply, error);
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+ }
|
|
|
+
|
|
|
+ db_redis_key_free(&key);
|
|
|
+ db_redis_key_free(&type_keys);
|
|
|
+ db_redis_consume_replies(con);
|
|
|
+
|
|
|
+ return 0;
|
|
|
+
|
|
|
+error:
|
|
|
+ db_redis_key_free(&key);
|
|
|
+ db_redis_key_free(&type_keys);
|
|
|
+ db_redis_key_free(&query_v);
|
|
|
+
|
|
|
+ if (reply)
|
|
|
+ db_redis_free_reply(&reply);
|
|
|
+
|
|
|
+ db_redis_consume_replies(con);
|
|
|
+
|
|
|
+ LM_ERR("failed to do the insert\n");
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ * Delete a row from the specified table
|
|
|
+ * _h: structure representing database connection
|
|
|
+ * _k: key names
|
|
|
+ * _o: operators
|
|
|
+ * _v: values of the keys that must match
|
|
|
+ * _n: number of key=value pairs
|
|
|
+ */
|
|
|
+int db_redis_delete(const db1_con_t* _h, const db_key_t* _k,
|
|
|
+ const db_op_t* _op, const db_val_t* _v, const int _n)
|
|
|
+{
|
|
|
+ km_redis_con_t *con = NULL;
|
|
|
+ redis_key_t *keys = NULL;
|
|
|
+ int keys_count = 0;
|
|
|
+ int *manual_keys = NULL;
|
|
|
+ int manual_keys_count = 0;
|
|
|
+ int free_op = 0;
|
|
|
+ int do_table_scan = 0;
|
|
|
+ db_op_t *query_ops = NULL;
|
|
|
+
|
|
|
+ // TODO: optimize mapping-based manual post-check (remove check for keys already
|
|
|
+ // in type query key)
|
|
|
+
|
|
|
+ con = REDIS_CON(_h);
|
|
|
+ if (con && con->con == NULL) {
|
|
|
+ if (db_redis_connect(con) != 0) {
|
|
|
+ LM_ERR("Failed to reconnect to server\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if(con == NULL || con->id == NULL || con->con == NULL) {
|
|
|
+ LM_ERR("connection to server is null\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ if(CON_TABLE(_h)->s == NULL) {
|
|
|
+ LM_ERR("prefix (table) name not set\n");
|
|
|
+ return -1;
|
|
|
+ } else {
|
|
|
+ LM_DBG("deleting from prefix (table) '%.*s'\n",
|
|
|
+ CON_TABLE(_h)->len, CON_TABLE(_h)->s);
|
|
|
+ }
|
|
|
+
|
|
|
+ free_op = 0;
|
|
|
+
|
|
|
+ if (_op == NULL) {
|
|
|
+ char *op = "=";
|
|
|
+ free_op = 1;
|
|
|
+ query_ops = (db_op_t*)pkg_malloc(_n * sizeof(db_op_t));
|
|
|
+ if (!query_ops) {
|
|
|
+ LM_ERR("Failed to allocate memory for query op list\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ for (int i = 0; i < _n; ++i) {
|
|
|
+ query_ops[i] = op;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ query_ops = (db_op_t*)_op;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (_n > 0) {
|
|
|
+ if (db_redis_build_query_keys(con, CON_TABLE(_h), _k, _v, query_ops, _n,
|
|
|
+ &keys, &keys_count, &manual_keys, &manual_keys_count, &do_table_scan) != 0) {
|
|
|
+ LM_ERR("failed to build query keys\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (!keys_count) {
|
|
|
+ if (do_table_scan) {
|
|
|
+ LM_DBG("unable to build query keys, falling back to full table scan\n");
|
|
|
+ } else {
|
|
|
+ LM_DBG("query keys not member of suitable mapping, skip full table scan\n");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LM_DBG("no columns given to build query keys, falling back to full table scan\n");
|
|
|
+ keys_count = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (db_redis_perform_delete(_h, con, _k, _v, query_ops, _n,
|
|
|
+ keys, keys_count, manual_keys, manual_keys_count, do_table_scan) != 0) {
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ LM_DBG("done performing delete\n");
|
|
|
+
|
|
|
+ if (free_op && query_ops) {
|
|
|
+ pkg_free(query_ops);
|
|
|
+ }
|
|
|
+ db_redis_key_free(&keys);
|
|
|
+ if (manual_keys)
|
|
|
+ pkg_free(manual_keys);
|
|
|
+ db_redis_consume_replies(con);
|
|
|
+
|
|
|
+ return 0;
|
|
|
+
|
|
|
+error:
|
|
|
+ LM_ERR("failed to do the query\n");
|
|
|
+ if (free_op && query_ops) {
|
|
|
+ pkg_free(query_ops);
|
|
|
+ }
|
|
|
+ db_redis_key_free(&keys);
|
|
|
+ if (manual_keys)
|
|
|
+ pkg_free(manual_keys);
|
|
|
+ db_redis_consume_replies(con);
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ * Update some rows in the specified table
|
|
|
+ * _h: structure representing database connection
|
|
|
+ * _k: key names
|
|
|
+ * _op: operators
|
|
|
+ * _v: values of the keys that must match
|
|
|
+ * _uk: updated columns
|
|
|
+ * _uv: updated values of the columns
|
|
|
+ * _n: number of key=value pairs
|
|
|
+ * _un: number of columns to update
|
|
|
+ */
|
|
|
+int db_redis_update(const db1_con_t* _h, const db_key_t* _k,
|
|
|
+ const db_op_t* _op, const db_val_t* _v, const db_key_t* _uk,
|
|
|
+ const db_val_t* _uv, const int _n, const int _nu)
|
|
|
+{
|
|
|
+ km_redis_con_t *con = NULL;
|
|
|
+ int free_op = 0;
|
|
|
+ int do_table_scan = 0;
|
|
|
+
|
|
|
+ redis_key_t *keys = NULL;
|
|
|
+ int keys_count = 0;
|
|
|
+ int *manual_keys = NULL;
|
|
|
+ int manual_keys_count = 0;
|
|
|
+ db_op_t *query_ops = NULL;
|
|
|
+
|
|
|
+ // TODO: optimize mapping-based manual post-check (remove check for keys already
|
|
|
+ // in type query key)
|
|
|
+
|
|
|
+ con = REDIS_CON(_h);
|
|
|
+ if (con && con->con == NULL) {
|
|
|
+ if (db_redis_connect(con) != 0) {
|
|
|
+ LM_ERR("Failed to reconnect to server\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if(con == NULL || con->id == NULL || con->con == NULL) {
|
|
|
+ LM_ERR("connection to server is null\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ if(CON_TABLE(_h)->s == NULL) {
|
|
|
+ LM_ERR("prefix (table) name not set\n");
|
|
|
+ return -1;
|
|
|
+ } else {
|
|
|
+ LM_DBG("updating prefix (table) '%.*s'\n",
|
|
|
+ CON_TABLE(_h)->len, CON_TABLE(_h)->s);
|
|
|
+ }
|
|
|
+
|
|
|
+ free_op = 0;
|
|
|
+
|
|
|
+ if (_op == NULL) {
|
|
|
+ char *op = "=";
|
|
|
+ free_op = 1;
|
|
|
+ query_ops = (db_op_t*)pkg_malloc(_n * sizeof(db_op_t));
|
|
|
+ if (!query_ops) {
|
|
|
+ LM_ERR("Failed to allocate memory for query op list\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ for (int i = 0; i < _n; ++i) {
|
|
|
+ query_ops[i] = op;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ query_ops = (db_op_t*)_op;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (_n > 0) {
|
|
|
+ if (db_redis_build_query_keys(con, CON_TABLE(_h), _k, _v, query_ops, _n,
|
|
|
+ &keys, &keys_count, &manual_keys, &manual_keys_count, &do_table_scan) != 0) {
|
|
|
+ LM_ERR("failed to build query keys\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ if (!keys_count) {
|
|
|
+ if (do_table_scan) {
|
|
|
+ LM_DBG("unable to build query keys, falling back to full table scan\n");
|
|
|
+ } else {
|
|
|
+ LM_DBG("query keys not member of suitable mapping, skip full table scan\n");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LM_DBG("no columns given to build query keys, falling back to full table scan\n");
|
|
|
+ keys_count = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (db_redis_perform_update(_h, con, _k, _v, query_ops, _uk, _uv, _n, _nu,
|
|
|
+ &keys, &keys_count, &manual_keys, &manual_keys_count, do_table_scan) != 0) {
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ LM_DBG("done performing update\n");
|
|
|
+
|
|
|
+ if (free_op && query_ops) {
|
|
|
+ pkg_free(query_ops);
|
|
|
+ }
|
|
|
+ db_redis_key_free(&keys);
|
|
|
+
|
|
|
+ if (manual_keys) {
|
|
|
+ pkg_free(manual_keys);
|
|
|
+ }
|
|
|
+ db_redis_consume_replies(con);
|
|
|
+ return 0;
|
|
|
+
|
|
|
+error:
|
|
|
+ LM_ERR("failed to do the query\n");
|
|
|
+ if (free_op && query_ops) {
|
|
|
+ pkg_free(query_ops);
|
|
|
+ }
|
|
|
+ db_redis_key_free(&keys);
|
|
|
+ if (manual_keys) {
|
|
|
+ pkg_free(manual_keys);
|
|
|
+ }
|
|
|
+ db_redis_consume_replies(con);
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ * Just like insert, but replace the row if it exists
|
|
|
+ */
|
|
|
+int db_redis_replace(const db1_con_t* _h, const db_key_t* _k,
|
|
|
+ const db_val_t* _v, const int _n, const int _un, const int _m) {
|
|
|
+ LM_DBG("perform redis replace\n");
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ * Store name of table that will be used by
|
|
|
+ * subsequent database functions
|
|
|
+ */
|
|
|
+int db_redis_use_table(db1_con_t* _h, const str* _t) {
|
|
|
+ return db_use_table(_h, _t);
|
|
|
+}
|
|
|
+
|
|
|
+int db_redis_free_result(db1_con_t* _h, db1_res_t* _r) {
|
|
|
+ LM_DBG("perform redis free result\n");
|
|
|
+
|
|
|
+ if(!_r)
|
|
|
+ return -1;
|
|
|
+ db_free_result(_r);
|
|
|
+ return 0;
|
|
|
+}
|