Browse Source

H2O: Optimize the cached queries test (#3904)

The main idea is to use several caches behind an interface that makes
them appear as a single one. Every cache operation chooses one of
them based on the key hash value, and while all subcaches are
multithreaded, the interface in front of them does not modify
anything, so it does not contain any critical sections. As a result,
if several threads access the cache, contention will be minimized,
provided that most of the time they use keys that map to different
subcaches.
Anton Kirilov 7 years ago
parent
commit
ebd92451a2

+ 97 - 0
frameworks/C/h2o/src/cache.c

@@ -0,0 +1,97 @@
+/*
+ Copyright (c) 2018 Anton Valentinov Kirilov
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
+ associated documentation files (the "Software"), to deal in the Software without restriction,
+ including without limitation the rights to use, copy, modify, merge, publish, distribute,
+ sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in all copies or
+ substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
+ NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+ DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT
+ OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+
+#include <assert.h>
+#include <string.h>
+#include <h2o/cache.h>
+
+#include "cache.h"
+#include "utility.h"
+
+static h2o_cache_t *get_cache(cache_t *cache, h2o_cache_hashcode_t keyhash)
+{
+	assert(is_power_of_2(cache->cache_num));
+	return cache->cache[keyhash & (cache->cache_num - 1)];
+}
+
+int cache_create(size_t thread_num,
+                 size_t capacity,
+                 uint64_t duration,
+                 void (*destroy_cb)(h2o_iovec_t value),
+                 cache_t *cache)
+{
+	int ret = EXIT_SUCCESS;
+
+	memset(cache, 0, sizeof(*cache));
+	// Rounding up to a power of 2 simplifies the calculations a little bit, and, as any increase in
+	// the number of caches, potentially reduces thread contention.
+	cache->cache_num = round_up_to_power_of_2(thread_num);
+	cache->cache_num = MAX(cache->cache_num, 1);
+	capacity = (capacity + cache->cache_num - 1) / cache->cache_num;
+	cache->cache = malloc(cache->cache_num * sizeof(*cache->cache));
+
+	if (cache->cache)
+		for (size_t i = 0; i < cache->cache_num; i++) {
+			cache->cache[i] = h2o_cache_create(H2O_CACHE_FLAG_MULTITHREADED,
+			                                   capacity,
+			                                   duration,
+			                                   destroy_cb);
+
+			if (!cache->cache[i]) {
+				cache->cache_num = i;
+				cache_destroy(cache);
+				cache->cache = NULL;
+				ret = EXIT_FAILURE;
+				break;
+			}
+		}
+	else
+		ret = EXIT_FAILURE;
+
+	return ret;
+}
+
+void cache_destroy(cache_t *cache)
+{
+	if (cache->cache) {
+		for (size_t i = 0; i < cache->cache_num; i++)
+			h2o_cache_destroy(cache->cache[i]);
+
+		free(cache->cache);
+	}
+}
+
+h2o_cache_ref_t *cache_fetch(cache_t *cache, uint64_t now, h2o_iovec_t key)
+{
+	const h2o_cache_hashcode_t keyhash = h2o_cache_calchash(key.base, key.len);
+
+	return h2o_cache_fetch(get_cache(cache, keyhash), now, key, keyhash);
+}
+
+void cache_release(cache_t *cache, h2o_cache_ref_t *ref)
+{
+	h2o_cache_release(get_cache(cache, h2o_cache_calchash(ref->key.base, ref->key.len)), ref);
+}
+
+int cache_set(uint64_t now, h2o_iovec_t key, h2o_iovec_t value, cache_t *cache)
+{
+	const h2o_cache_hashcode_t keyhash = h2o_cache_calchash(key.base, key.len);
+
+	return h2o_cache_set(get_cache(cache, keyhash), now, key, keyhash, value);
+}

+ 41 - 0
frameworks/C/h2o/src/cache.h

@@ -0,0 +1,41 @@
+/*
+ Copyright (c) 2018 Anton Valentinov Kirilov
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
+ associated documentation files (the "Software"), to deal in the Software without restriction,
+ including without limitation the rights to use, copy, modify, merge, publish, distribute,
+ sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in all copies or
+ substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
+ NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+ DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT
+ OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+
+#ifndef CACHE_H_
+
+#define CACHE_H_
+
+#include <h2o/cache.h>
+
+typedef struct {
+	h2o_cache_t **cache;
+	size_t cache_num;
+} cache_t;
+
+int cache_create(size_t thread_num,
+                 size_t capacity,
+                 uint64_t duration,
+                 void (*destroy_cb)(h2o_iovec_t value),
+                 cache_t *cache);
+void cache_destroy(cache_t *cache);
+h2o_cache_ref_t *cache_fetch(cache_t *cache, uint64_t now, h2o_iovec_t key);
+void cache_release(cache_t *cache, h2o_cache_ref_t *ref);
+int cache_set(uint64_t now, h2o_iovec_t key, h2o_iovec_t value, cache_t *cache);
+
+#endif // CACHE_H_

+ 8 - 16
frameworks/C/h2o/src/main.c

@@ -26,12 +26,12 @@
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
-#include <h2o/cache.h>
 #include <h2o/serverutil.h>
 #include <sys/resource.h>
 #include <sys/signalfd.h>
 #include <sys/time.h>
 
+#include "cache.h"
 #include "error.h"
 #include "event_loop.h"
 #include "request_handler.h"
@@ -77,10 +77,7 @@ static void free_global_data(global_data_t *global_data)
 		mustache_free(&api, global_data->fortunes_template);
 	}
 
-	if (global_data->world_cache)
-		h2o_cache_destroy(global_data->world_cache);
-
-	CHECK_ERROR(pthread_mutex_destroy, &global_data->world_cache_lock);
+	cache_destroy(&global_data->world_cache);
 	h2o_config_dispose(&global_data->h2o_config);
 
 	if (global_data->ssl_ctx)
@@ -121,7 +118,6 @@ static size_t get_maximum_cache_line_size(void)
 
 static int initialize_global_data(const config_t *config, global_data_t *global_data)
 {
-	pthread_mutexattr_t attr;
 	sigset_t signals;
 
 	memset(global_data, 0, sizeof(*global_data));
@@ -134,16 +130,12 @@ static int initialize_global_data(const config_t *config, global_data_t *global_
 	CHECK_ERRNO_RETURN(global_data->signal_fd, signalfd, -1, &signals, SFD_NONBLOCK | SFD_CLOEXEC);
 	global_data->fortunes_template = get_fortunes_template(config->template_path);
 	h2o_config_init(&global_data->h2o_config);
-	CHECK_ERROR(pthread_mutexattr_init, &attr);
-	CHECK_ERROR(pthread_mutexattr_settype, &attr, PTHREAD_MUTEX_ADAPTIVE_NP);
-	CHECK_ERROR(pthread_mutex_init, &global_data->world_cache_lock, &attr);
-	CHECK_ERROR(pthread_mutexattr_destroy, &attr);
-	global_data->world_cache = h2o_cache_create(0,
-	                                            config->world_cache_capacity,
-	                                            config->world_cache_duration,
-	                                            free_world_cache_entry);
-
-	if (!global_data->world_cache)
+
+	if (cache_create(config->thread_num,
+	                 config->world_cache_capacity,
+	                 config->world_cache_duration,
+	                 free_world_cache_entry,
+	                 &global_data->world_cache))
 		goto error;
 
 	if (config->cert && config->key)

+ 20 - 0
frameworks/C/h2o/src/utility.c

@@ -18,6 +18,8 @@
 */
 
 #include <assert.h>
+#include <limits.h>
+#include <stdbool.h>
 #include <stdint.h>
 #include <stdlib.h>
 #include <yajl/yajl_gen.h>
@@ -86,3 +88,21 @@ uint32_t get_random_number(uint32_t max_rand, unsigned int *seed)
 
 	return ret / bucket_size;
 }
+
+bool is_power_of_2(size_t x)
+{
+	return !!x & !(x & (x - 1));
+}
+
+size_t round_up_to_power_of_2(size_t x)
+{
+	static_assert(sizeof(size_t) == sizeof(unsigned long),
+	              "The size_t type must have the same size as unsigned long.");
+
+	size_t ret = (SIZE_MAX ^ SIZE_MAX >> 1) >> __builtin_clzl(x);
+
+	if (x - ret)
+		ret <<= 1;
+
+	return ret;
+}

+ 4 - 3
frameworks/C/h2o/src/utility.h

@@ -22,7 +22,6 @@
 #define UTILITY_H_
 
 #include <h2o.h>
-#include <pthread.h>
 #include <stdint.h>
 #include <h2o/cache.h>
 #include <openssl/ssl.h>
@@ -30,6 +29,7 @@
 #include <yajl/yajl_gen.h>
 #include <mustache.h>
 
+#include "cache.h"
 #include "list.h"
 
 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(*(a)))
@@ -69,12 +69,11 @@ typedef struct {
 	h2o_socket_t *signals;
 	SSL_CTX *ssl_ctx;
 	global_thread_data_t *global_thread_data;
-	h2o_cache_t *world_cache;
 	size_t memory_alignment;
 	int signal_fd;
 	bool shutdown;
+	cache_t world_cache;
 	h2o_globalconf_t h2o_config;
-	pthread_mutex_t world_cache_lock;
 } global_data_t;
 
 typedef struct {
@@ -85,5 +84,7 @@ typedef struct {
 void free_json_generator(json_generator_t *gen, list_t **pool, size_t *gen_num, size_t max_gen);
 json_generator_t *get_json_generator(list_t **pool, size_t *gen_num);
 uint32_t get_random_number(uint32_t max_rand, unsigned int *seed);
+bool is_power_of_2(size_t x);
+size_t round_up_to_power_of_2(size_t x);
 
 #endif // UTILITY_H_

+ 10 - 24
frameworks/C/h2o/src/world.c

@@ -20,7 +20,6 @@
 #include <assert.h>
 #include <ctype.h>
 #include <h2o.h>
-#include <pthread.h>
 #include <stdbool.h>
 #include <stddef.h>
 #include <stdint.h>
@@ -32,6 +31,7 @@
 #include <yajl/yajl_gen.h>
 
 #include "bitset.h"
+#include "cache.h"
 #include "database.h"
 #include "error.h"
 #include "request_handler.h"
@@ -88,10 +88,7 @@ static int compare_items(const void *x, const void *y);
 static void complete_multiple_query(multiple_query_ctx_t *query_ctx);
 static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req);
 static void do_updates(multiple_query_ctx_t *query_ctx);
-static void fetch_from_cache(uint64_t now,
-                             h2o_cache_t *world_cache,
-                             pthread_mutex_t *world_cache_lock,
-                             multiple_query_ctx_t *query_ctx);
+static void fetch_from_cache(uint64_t now, cache_t *world_cache, multiple_query_ctx_t *query_ctx);
 static size_t get_query_number(h2o_req_t *req);
 static void initialize_ids(size_t num_query, query_result_t *res, unsigned int *seed);
 static void on_multiple_query_error(db_query_param_t *param, const char *error_string);
@@ -198,8 +195,7 @@ static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req)
 
 		if (use_cache) {
 			fetch_from_cache(h2o_now(ctx->event_loop.h2o_ctx.loop),
-			                 ctx->global_data->world_cache,
-			                 &ctx->global_data->world_cache_lock,
+			                 &ctx->global_data->world_cache,
 			                 query_ctx);
 
 			if (query_ctx->num_result == query_ctx->num_query) {
@@ -310,30 +306,23 @@ error:
 	send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, query_ctx->req);
 }
 
-static void fetch_from_cache(uint64_t now,
-                             h2o_cache_t *world_cache,
-                             pthread_mutex_t *world_cache_lock,
-                             multiple_query_ctx_t *query_ctx)
+static void fetch_from_cache(uint64_t now, cache_t *world_cache, multiple_query_ctx_t *query_ctx)
 {
 	h2o_iovec_t key = {.len = sizeof(query_ctx->res->id)};
 
-	CHECK_ERROR(pthread_mutex_lock, world_cache_lock);
-
 	for (size_t i = 0; i < query_ctx->num_query; i++) {
 		key.base = (char *) &query_ctx->res[i].id;
 
-		h2o_cache_ref_t * const r = h2o_cache_fetch(world_cache, now, key, 0);
+		h2o_cache_ref_t * const r = cache_fetch(world_cache, now, key);
 
 		if (r) {
 			query_ctx->res[i].id = query_ctx->res[query_ctx->num_result].id;
 			memcpy(query_ctx->res + query_ctx->num_result++,
 			       r->value.base,
 			       sizeof(*query_ctx->res));
-			h2o_cache_release(world_cache, r);
+			cache_release(world_cache, r);
 		}
 	}
-
-	CHECK_ERROR(pthread_mutex_unlock, world_cache_lock);
 }
 
 static size_t get_query_number(h2o_req_t *req)
@@ -410,13 +399,10 @@ static result_return_t on_multiple_query_result(db_query_param_t *param, PGresul
 				const h2o_iovec_t value = {.base = (char *) r, .len = sizeof(*r)};
 
 				*r = query_ctx->res[query_ctx->num_result];
-				CHECK_ERROR(pthread_mutex_lock, &ctx->global_data->world_cache_lock);
-				h2o_cache_set(ctx->global_data->world_cache,
-				              h2o_now(ctx->event_loop.h2o_ctx.loop),
-				              key,
-				              0,
-				              value);
-				CHECK_ERROR(pthread_mutex_unlock, &ctx->global_data->world_cache_lock);
+				cache_set(h2o_now(ctx->event_loop.h2o_ctx.loop),
+				          key,
+				          value,
+				          &ctx->global_data->world_cache);
 			}
 		}