浏览代码

db_redis: use sscan

Sipwise Development Team 4 年之前
父节点
当前提交
41c4736367
共有 1 个文件被更改,包括 84 次插入33 次删除
  1. 84 33
      src/modules/db_redis/redis_dbase.c

+ 84 - 33
src/modules/db_redis/redis_dbase.c

@@ -646,7 +646,7 @@ static int db_redis_build_query_keys(km_redis_con_t *con, const str *table_name,
 		const db_key_t *_k, const db_val_t *_v, const db_op_t *_op,
 		const int _n, redis_key_t **query_keys, int *query_keys_count,
 		int **manual_keys, int *manual_keys_count, int *do_table_scan,
-		uint64_t *ts_scan_start, str *ts_scan_key)
+		uint64_t *ts_scan_start, str *ts_scan_key, str *ts_scan_table)
 {
 
 	struct str_hash_entry *table_e;
@@ -775,6 +775,22 @@ static int db_redis_build_query_keys(km_redis_con_t *con, const str *table_name,
 						keyname.len, keyname.s,
 						(unsigned long long)*ts_scan_start);
 				*ts_scan_key = keyname;
+				if(ts_scan_table) {
+					if(ts_scan_table->s)
+						pkg_free(ts_scan_table->s);
+					// <version>:<table>::index::<type>
+					ts_scan_table->len = table->version_code.len
+										 + table_name->len + 9 + type->type.len;
+					ts_scan_table->s = pkg_malloc(ts_scan_table->len + 1);
+					if(!ts_scan_table->s) {
+						PKG_MEM_ERROR;
+						goto err;
+					}
+					sprintf(ts_scan_table->s, "%.*s%.*s::index::%.*s",
+							table->version_code.len, table->version_code.s,
+							table_name->len, table_name->s, type->type.len,
+							type->type.s);
+				}
 				keyname.s = NULL;
 			} else if(keyname.s) {
 				pkg_free(keyname.s);
@@ -811,9 +827,9 @@ err:
 }
 
 static int db_redis_scan_query_keys_pattern(km_redis_con_t *con,
-		const str *match_pattern, const int _n, redis_key_t **query_keys,
-		int *query_keys_count, int **manual_keys, int *manual_keys_count,
-		unsigned int match_count_start_val)
+		const str *match_pattern, const str *index_key, const int _n,
+		redis_key_t **query_keys, int *query_keys_count, int **manual_keys,
+		int *manual_keys_count, unsigned int match_count_start_val)
 {
 
 	size_t i = 0;
@@ -838,9 +854,21 @@ static int db_redis_scan_query_keys_pattern(km_redis_con_t *con,
 	do {
 		snprintf(cursor_str, sizeof(cursor_str), "%lu", cursor);
 
-		if(db_redis_key_add_string(&query_v, "SCAN", 4) != 0) {
-			LM_ERR("Failed to add scan command to scan query\n");
-			goto err;
+		if(!index_key || !index_key->len) {
+			if(db_redis_key_add_string(&query_v, "SCAN", 4) != 0) {
+				LM_ERR("Failed to add scan command to scan query\n");
+				goto err;
+			}
+		} else {
+			if(db_redis_key_add_string(&query_v, "SSCAN", 5) != 0) {
+				LM_ERR("Failed to add scan command to scan query\n");
+				goto err;
+			}
+			if(db_redis_key_add_string(&query_v, index_key->s, index_key->len)
+					!= 0) {
+				LM_ERR("Failed to add scan command to scan query\n");
+				goto err;
+			}
 		}
 		if(db_redis_key_add_string(&query_v, cursor_str, strlen(cursor_str))
 				!= 0) {
@@ -1031,7 +1059,7 @@ return -1;
 static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name,
 		const int _n, redis_key_t **query_keys, int *query_keys_count,
 		int **manual_keys, int *manual_keys_count, uint64_t ts_scan_start,
-		const str *ts_scan_key)
+		const str *ts_scan_key, const str *ts_scan_table)
 {
 
 	struct str_hash_entry *table_e;
@@ -1068,9 +1096,9 @@ static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name,
 		int len = sprintf(match, "%.*s%.*s:entry::*", table->version_code.len,
 				table->version_code.s, table_name->len, table_name->s);
 		str match_pattern = {match, len};
-		ret = db_redis_scan_query_keys_pattern(con, &match_pattern, _n,
-				query_keys, query_keys_count, manual_keys, manual_keys_count,
-				1000);
+		ret = db_redis_scan_query_keys_pattern(con, &match_pattern,
+				ts_scan_table, _n, query_keys, query_keys_count, manual_keys,
+				manual_keys_count, 1000);
 		pkg_free(match);
 		return ret;
 	}
@@ -1134,9 +1162,9 @@ static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name,
 				   "'%.*s'\n",
 					len, match);
 
-			ret = db_redis_scan_query_keys_pattern(con, &match_pattern, _n,
-					&set_keys, &set_keys_count, manual_keys, manual_keys_count,
-					5000);
+			ret = db_redis_scan_query_keys_pattern(con, &match_pattern,
+					ts_scan_table, _n, &set_keys, &set_keys_count, manual_keys,
+					manual_keys_count, 5000);
 			if(ret)
 				goto out;
 		}
@@ -1181,9 +1209,9 @@ static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name,
 		LM_DBG("running timestamp/int range matching using pattern '%.*s'\n",
 				len, match);
 
-		ret = db_redis_scan_query_keys_pattern(con, &match_pattern, _n,
-				&set_keys, &set_keys_count, manual_keys, manual_keys_count,
-				5000);
+		ret = db_redis_scan_query_keys_pattern(con, &match_pattern,
+				ts_scan_table, _n, &set_keys, &set_keys_count, manual_keys,
+				manual_keys_count, 5000);
 		if(ret)
 			goto out;
 	}
@@ -1197,9 +1225,9 @@ static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name,
 		LM_DBG("running timestamp/int range matching using pattern '%.*s'\n",
 				len, match);
 
-		ret = db_redis_scan_query_keys_pattern(con, &match_pattern, _n,
-				&set_keys, &set_keys_count, manual_keys, manual_keys_count,
-				5000);
+		ret = db_redis_scan_query_keys_pattern(con, &match_pattern,
+				ts_scan_table, _n, &set_keys, &set_keys_count, manual_keys,
+				manual_keys_count, 5000);
 		if(ret)
 			goto out;
 	}
@@ -1577,7 +1605,7 @@ static int db_redis_perform_query(const db1_con_t *_h, km_redis_con_t *con,
 		const db_key_t *_c, const int _n, const int _nc, db1_res_t **_r,
 		redis_key_t **keys, int *keys_count, int **manual_keys,
 		int *manual_keys_count, int do_table_scan, uint64_t ts_scan_start,
-		const str *ts_scan_key)
+		const str *ts_scan_key, const str *ts_scan_table)
 {
 
 	redisReply *reply = NULL;
@@ -1612,7 +1640,8 @@ static int db_redis_perform_query(const db1_con_t *_h, km_redis_con_t *con,
 					CON_TABLE(_h)->s);
 		}
 		if(db_redis_scan_query_keys(con, CON_TABLE(_h), _n, keys, keys_count,
-				   manual_keys, manual_keys_count, ts_scan_start, ts_scan_key)
+				   manual_keys, manual_keys_count, ts_scan_start, ts_scan_key,
+				   ts_scan_table)
 				!= 0) {
 			LM_ERR("failed to scan query keys\n");
 			goto error;
@@ -1767,7 +1796,7 @@ static int db_redis_perform_delete(const db1_con_t *_h, km_redis_con_t *con,
 		const db_key_t *_k, const db_val_t *_v, const db_op_t *_op,
 		const int _n, redis_key_t **keys, int *keys_count, int **manual_keys,
 		int *manual_keys_count, int do_table_scan, uint64_t ts_scan_start,
-		const str *ts_scan_key)
+		const str *ts_scan_key, const str *ts_scan_table)
 {
 
 	int i = 0, j = 0;
@@ -1805,7 +1834,8 @@ static int db_redis_perform_delete(const db1_con_t *_h, km_redis_con_t *con,
 			LM_WARN("  scan key %d is '%.*s'\n", i, _k[i]->len, _k[i]->s);
 		}
 		if(db_redis_scan_query_keys(con, CON_TABLE(_h), _n, keys, keys_count,
-				   manual_keys, manual_keys_count, ts_scan_start, ts_scan_key)
+				   manual_keys, manual_keys_count, ts_scan_start, ts_scan_key,
+				   ts_scan_table)
 				!= 0) {
 			LM_ERR("failed to scan query keys\n");
 			goto error;
@@ -2081,7 +2111,7 @@ static int db_redis_perform_update(const db1_con_t *_h, km_redis_con_t *con,
 		const db_key_t *_uk, const db_val_t *_uv, const int _n, const int _nu,
 		redis_key_t **keys, int *keys_count, int **manual_keys,
 		int *manual_keys_count, int do_table_scan, uint64_t ts_scan_start,
-		const str *ts_scan_key)
+		const str *ts_scan_key, const str *ts_scan_table)
 {
 
 	redisReply *reply = NULL;
@@ -2113,7 +2143,8 @@ static int db_redis_perform_update(const db1_con_t *_h, km_redis_con_t *con,
 			LM_WARN("  scan key %d is '%.*s'\n", i, _k[i]->len, _k[i]->s);
 		}
 		if(db_redis_scan_query_keys(con, CON_TABLE(_h), _n, keys, keys_count,
-				   manual_keys, manual_keys_count, ts_scan_start, ts_scan_key)
+				   manual_keys, manual_keys_count, ts_scan_start, ts_scan_key,
+				   ts_scan_table)
 				!= 0) {
 			LM_ERR("failed to scan query keys\n");
 			goto error;
@@ -2585,6 +2616,9 @@ int db_redis_query(const db1_con_t *_h, const db_key_t *_k, const db_op_t *_op,
 	str ts_scan_key = {
 			0,
 	};
+	str ts_scan_table = {
+			0,
+	};
 
 	redis_key_t *keys = NULL;
 	int keys_count = 0;
@@ -2659,7 +2693,7 @@ int db_redis_query(const db1_con_t *_h, const db_key_t *_k, const db_op_t *_op,
 	if(_n > 0) {
 		if(db_redis_build_query_keys(con, CON_TABLE(_h), _k, _v, query_ops, _n,
 				   &keys, &keys_count, &manual_keys, &manual_keys_count,
-				   &do_table_scan, &ts_scan_start, &ts_scan_key)
+				   &do_table_scan, &ts_scan_start, &ts_scan_key, &ts_scan_table)
 				!= 0) {
 			LM_ERR("failed to build query keys\n");
 			goto error;
@@ -2682,7 +2716,7 @@ int db_redis_query(const db1_con_t *_h, const db_key_t *_k, const db_op_t *_op,
 
 	if(db_redis_perform_query(_h, con, _k, _v, query_ops, _c, _n, _nc, _r,
 			   &keys, &keys_count, &manual_keys, &manual_keys_count,
-			   do_table_scan, ts_scan_start, &ts_scan_key)
+			   do_table_scan, ts_scan_start, &ts_scan_key, &ts_scan_table)
 			!= 0) {
 		goto error;
 	}
@@ -2699,7 +2733,8 @@ int db_redis_query(const db1_con_t *_h, const db_key_t *_k, const db_op_t *_op,
 	}
 	if(ts_scan_key.s)
 		pkg_free(ts_scan_key.s);
-
+	if(ts_scan_table.s)
+		pkg_free(ts_scan_table.s);
 	db_redis_consume_replies(con);
 	return 0;
 
@@ -2714,6 +2749,8 @@ error:
 	}
 	if(ts_scan_key.s)
 		pkg_free(ts_scan_key.s);
+	if(ts_scan_table.s)
+		pkg_free(ts_scan_table.s);
 	db_redis_consume_replies(con);
 
 
@@ -2905,6 +2942,9 @@ int db_redis_delete(const db1_con_t *_h, const db_key_t *_k, const db_op_t *_op,
 	str ts_scan_key = {
 			0,
 	};
+	str ts_scan_table = {
+			0,
+	};
 	db_op_t *query_ops = NULL;
 	int i;
 
@@ -2950,7 +2990,7 @@ int db_redis_delete(const db1_con_t *_h, const db_key_t *_k, const db_op_t *_op,
 	if(_n > 0) {
 		if(db_redis_build_query_keys(con, CON_TABLE(_h), _k, _v, query_ops, _n,
 				   &keys, &keys_count, &manual_keys, &manual_keys_count,
-				   &do_table_scan, &ts_scan_start, &ts_scan_key)
+				   &do_table_scan, &ts_scan_start, &ts_scan_key, &ts_scan_table)
 				!= 0) {
 			LM_ERR("failed to build query keys\n");
 			goto error;
@@ -2973,7 +3013,7 @@ int db_redis_delete(const db1_con_t *_h, const db_key_t *_k, const db_op_t *_op,
 
 	if(db_redis_perform_delete(_h, con, _k, _v, query_ops, _n, &keys,
 			   &keys_count, &manual_keys, &manual_keys_count, do_table_scan,
-			   ts_scan_start, &ts_scan_key)
+			   ts_scan_start, &ts_scan_key, &ts_scan_table)
 			!= 0) {
 		goto error;
 	}
@@ -2988,6 +3028,8 @@ int db_redis_delete(const db1_con_t *_h, const db_key_t *_k, const db_op_t *_op,
 		pkg_free(manual_keys);
 	if(ts_scan_key.s)
 		pkg_free(ts_scan_key.s);
+	if(ts_scan_table.s)
+		pkg_free(ts_scan_table.s);
 	db_redis_consume_replies(con);
 
 	return 0;
@@ -3002,6 +3044,8 @@ error:
 		pkg_free(manual_keys);
 	if(ts_scan_key.s)
 		pkg_free(ts_scan_key.s);
+	if(ts_scan_table.s)
+		pkg_free(ts_scan_table.s);
 	db_redis_consume_replies(con);
 	return -1;
 }
@@ -3028,6 +3072,9 @@ int db_redis_update(const db1_con_t *_h, const db_key_t *_k, const db_op_t *_op,
 	str ts_scan_key = {
 			0,
 	};
+	str ts_scan_table = {
+			0,
+	};
 
 	redis_key_t *keys = NULL;
 	int keys_count = 0;
@@ -3078,7 +3125,7 @@ int db_redis_update(const db1_con_t *_h, const db_key_t *_k, const db_op_t *_op,
 	if(_n > 0) {
 		if(db_redis_build_query_keys(con, CON_TABLE(_h), _k, _v, query_ops, _n,
 				   &keys, &keys_count, &manual_keys, &manual_keys_count,
-				   &do_table_scan, &ts_scan_start, &ts_scan_key)
+				   &do_table_scan, &ts_scan_start, &ts_scan_key, &ts_scan_table)
 				!= 0) {
 			LM_ERR("failed to build query keys\n");
 			goto error;
@@ -3100,7 +3147,7 @@ int db_redis_update(const db1_con_t *_h, const db_key_t *_k, const db_op_t *_op,
 
 	if(db_redis_perform_update(_h, con, _k, _v, query_ops, _uk, _uv, _n, _nu,
 			   &keys, &keys_count, &manual_keys, &manual_keys_count,
-			   do_table_scan, ts_scan_start, &ts_scan_key)
+			   do_table_scan, ts_scan_start, &ts_scan_key, &ts_scan_table)
 			!= 0) {
 		goto error;
 	}
@@ -3117,6 +3164,8 @@ int db_redis_update(const db1_con_t *_h, const db_key_t *_k, const db_op_t *_op,
 	}
 	if(ts_scan_key.s)
 		pkg_free(ts_scan_key.s);
+	if(ts_scan_table.s)
+		pkg_free(ts_scan_table.s);
 	db_redis_consume_replies(con);
 	return 0;
 
@@ -3131,6 +3180,8 @@ error:
 	}
 	if(ts_scan_key.s)
 		pkg_free(ts_scan_key.s);
+	if(ts_scan_table.s)
+		pkg_free(ts_scan_table.s);
 	db_redis_consume_replies(con);
 	return -1;
 }