Browse Source

H2O: Implement the cached queries test type (#2827)

The implementation uses the in-memory caching mechanism provided by libh2o.
Anton Kirilov 8 years ago
parent
commit
86d982afe6

+ 1 - 0
frameworks/C/h2o/benchmark_config.json

@@ -9,6 +9,7 @@
       "fortune_url": "/fortunes",
       "update_url": "/updates?queries=",
       "plaintext_url": "/plaintext",
+      "cached_query_url": "/cached-worlds?queries=",
       "port": 8080,
       "approach": "Realistic",
       "classification": "Platform",

+ 1 - 0
frameworks/C/h2o/setup.sh

@@ -52,6 +52,7 @@ generate_profile_data()
 	run_curl fortunes
 	run_curl updates?queries=20
 	run_curl plaintext
+	run_curl cached-worlds?queries=20
 	kill -s SIGTERM $H2O_APP_PROFILE_PID
 	wait $H2O_APP_PROFILE_PID
 }

+ 35 - 3
frameworks/C/h2o/src/main.c

@@ -26,6 +26,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
+#include <h2o/cache.h>
 #include <h2o/serverutil.h>
 #include <sys/resource.h>
 #include <sys/signalfd.h>
@@ -40,15 +41,18 @@
 #include "utility.h"
 
 #define DEFAULT_CACHE_LINE_SIZE 128
+#define MS_IN_S 1000
 #define USAGE_MESSAGE \
 	"Usage:\n%s [-a <max connections accepted simultaneously>] [-b <bind address>] " \
-	"[-c <certificate file>] [-d <database connection string>] [-f fortunes template file path] " \
+	"[-c <certificate file>] [-d <database connection string>] " \
+	"[-e <World cache duration in seconds>] [-f fortunes template file path] " \
 	"[-j <max reused JSON generators>] [-k <private key file>] [-l <log path>] " \
 	"[-m <max database connections per thread>] [-p <port>] " \
 	"[-q <max enqueued database queries per thread>] [-r <root directory>] " \
-	"[-s <HTTPS port>] [-t <thread number>]\n"
+	"[-s <HTTPS port>] [-t <thread number>] [-w <World cache capacity in bytes>]\n"
 
 static void free_global_data(global_data_t *global_data);
+static void free_world_cache_entry(h2o_iovec_t value);
 static size_t get_maximum_cache_line_size(void);
 static int initialize_global_data(const config_t *config, global_data_t *global_data);
 static int parse_options(int argc, char *argv[], config_t *config);
@@ -73,12 +77,20 @@ static void free_global_data(global_data_t *global_data)
 		mustache_free(&api, global_data->fortunes_template);
 	}
 
+	if (global_data->world_cache)
+		h2o_cache_destroy(global_data->world_cache);
+
 	h2o_config_dispose(&global_data->h2o_config);
 
 	if (global_data->ssl_ctx)
 		cleanup_openssl(global_data);
 }
 
+static void free_world_cache_entry(h2o_iovec_t value)
+{
+	free(value.base);
+}
+
 static size_t get_maximum_cache_line_size(void)
 {
 	const int name[] = {_SC_LEVEL1_DCACHE_LINESIZE,
@@ -120,6 +132,13 @@ static int initialize_global_data(const config_t *config, global_data_t *global_
 	CHECK_ERRNO_RETURN(global_data->signal_fd, signalfd, -1, &signals, SFD_NONBLOCK | SFD_CLOEXEC);
 	global_data->fortunes_template = get_fortunes_template(config->template_path);
 	h2o_config_init(&global_data->h2o_config);
+	global_data->world_cache = h2o_cache_create(H2O_CACHE_FLAG_MULTITHREADED,
+	                                            config->world_cache_capacity,
+	                                            config->world_cache_duration,
+	                                            free_world_cache_entry);
+
+	if (!global_data->world_cache)
+		goto error;
 
 	if (config->cert && config->key)
 		initialize_openssl(config, global_data);
@@ -170,7 +189,7 @@ static int parse_options(int argc, char *argv[], config_t *config)
 	opterr = 0;
 
 	while (1) {
-		const int opt = getopt(argc, argv, "?a:b:c:d:f:j:k:l:m:p:q:r:s:t:");
+		const int opt = getopt(argc, argv, "?a:b:c:d:e:f:j:k:l:m:p:q:r:s:t:w:");
 
 		if (opt == -1)
 			break;
@@ -203,6 +222,10 @@ static int parse_options(int argc, char *argv[], config_t *config)
 			case 'd':
 				config->db_host = optarg;
 				break;
+			case 'e':
+				PARSE_NUMBER(config->world_cache_duration);
+				config->world_cache_duration *= MS_IN_S;
+				break;
 			case 'f':
 				config->template_path = optarg;
 				break;
@@ -233,6 +256,9 @@ static int parse_options(int argc, char *argv[], config_t *config)
 			case 't':
 				PARSE_NUMBER(config->thread_num);
 				break;
+			case 'w':
+				PARSE_NUMBER(config->world_cache_capacity);
+				break;
 			default:
 				fprintf(stderr, USAGE_MESSAGE, *argv);
 				return EXIT_FAILURE;
@@ -247,6 +273,9 @@ static int parse_options(int argc, char *argv[], config_t *config)
 
 static void set_default_options(config_t *config)
 {
+	if (!config->world_cache_duration)
+		config->world_cache_duration = 3600000;
+
 	if (!config->max_accept)
 		config->max_accept = 10;
 
@@ -262,6 +291,9 @@ static void set_default_options(config_t *config)
 	if (!config->thread_num)
 		config->thread_num = h2o_numproc();
 
+	if (!config->world_cache_capacity)
+		config->world_cache_capacity = 131072;
+
 	if (!config->https_port)
 		config->https_port = 4443;
 }

+ 7 - 0
frameworks/C/h2o/src/request_handler.c

@@ -181,6 +181,13 @@ void register_request_handlers(h2o_hostconf_t *hostconf, h2o_access_log_filehand
 	handler = h2o_create_handler(pathconf, sizeof(*handler));
 	handler->on_req = plaintext;
 
+	if (log_handle)
+		h2o_access_log_register(pathconf, log_handle);
+
+	pathconf = h2o_config_register_path(hostconf, "/cached-worlds", 0);
+	handler = h2o_create_handler(pathconf, sizeof(*handler));
+	handler->on_req = cached_queries;
+
 	if (log_handle)
 		h2o_access_log_register(pathconf, log_handle);
 }

+ 4 - 0
frameworks/C/h2o/src/utility.h

@@ -23,6 +23,7 @@
 
 #include <h2o.h>
 #include <stdint.h>
+#include <h2o/cache.h>
 #include <openssl/ssl.h>
 #include <stdbool.h>
 #include <yajl/yajl_gen.h>
@@ -50,11 +51,13 @@ typedef struct {
 	const char *log;
 	const char *root;
 	const char *template_path;
+	uint64_t world_cache_duration;
 	size_t max_accept;
 	size_t max_db_conn_num;
 	size_t max_json_generator;
 	size_t max_query_num;
 	size_t thread_num;
+	size_t world_cache_capacity;
 	uint16_t https_port;
 	uint16_t port;
 } config_t;
@@ -65,6 +68,7 @@ typedef struct {
 	h2o_socket_t *signals;
 	SSL_CTX *ssl_ctx;
 	global_thread_data_t *global_thread_data;
+	h2o_cache_t *world_cache;
 	size_t memory_alignment;
 	int signal_fd;
 	bool shutdown;

+ 98 - 26
frameworks/C/h2o/src/world.c

@@ -26,6 +26,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <arpa/inet.h>
+#include <h2o/cache.h>
 #include <postgresql/libpq-fe.h>
 #include <yajl/yajl_gen.h>
 
@@ -77,13 +78,18 @@ struct multiple_query_ctx_t {
 	size_t num_result;
 	bool do_update;
 	bool error;
+	bool use_cache;
 	query_result_t res[];
 };
 
 static void cleanup_multiple_query(void *data);
 static int compare_items(const void *x, const void *y);
-static int do_multiple_queries(bool do_update, h2o_req_t *req);
+static void complete_multiple_query(multiple_query_ctx_t *query_ctx);
+static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req);
 static void do_updates(multiple_query_ctx_t *query_ctx);
+static void fetch_from_cache(uint64_t now,
+                             h2o_cache_t *world_cache,
+                             multiple_query_ctx_t *query_ctx);
 static size_t get_query_number(h2o_req_t *req);
 static void initialize_ids(size_t num_query, query_result_t *res, unsigned int *seed);
 static void on_multiple_query_error(db_query_param_t *param, const char *error_string);
@@ -124,7 +130,30 @@ static int compare_items(const void *x, const void *y)
 	return r1->id < r2->id ? -1 : r1->id > r2->id;
 }
 
-static int do_multiple_queries(bool do_update, h2o_req_t *req)
+static void complete_multiple_query(multiple_query_ctx_t *query_ctx)
+{
+	assert(query_ctx->num_result == query_ctx->num_query);
+
+	if (query_ctx->do_update)
+		do_updates(query_ctx);
+	else {
+		thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
+		                                                      event_loop.h2o_ctx,
+		                                                      query_ctx->req->conn->ctx);
+
+		query_ctx->gen = get_json_generator(&ctx->json_generator, &ctx->json_generator_num);
+
+		if (query_ctx->gen)
+			serialize_items(query_ctx->res,
+			                query_ctx->num_result,
+			                &query_ctx->gen,
+			                query_ctx->req);
+		else
+			send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, query_ctx->req);
+	}
+}
+
+static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req)
 {
 	thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
 	                                                      event_loop.h2o_ctx,
@@ -159,17 +188,31 @@ static int do_multiple_queries(bool do_update, h2o_req_t *req)
 	if (query_ctx) {
 		memset(query_ctx, 0, sz);
 		query_ctx->num_query = num_query;
-		query_ctx->num_query_in_progress = num_query_in_progress;
 		query_ctx->req = req;
 		query_ctx->do_update = do_update;
+		query_ctx->use_cache = use_cache;
 		query_ctx->query_param = (query_param_t *) ((char *) query_ctx + base_size);
 		initialize_ids(num_query, query_ctx->res, &ctx->random_seed);
 
+		if (use_cache) {
+			fetch_from_cache(h2o_now(ctx->event_loop.h2o_ctx.loop),
+			                 ctx->global_data->world_cache,
+			                 query_ctx);
+
+			if (query_ctx->num_result == query_ctx->num_query) {
+				complete_multiple_query(query_ctx);
+				return 0;
+			}
+		}
+
+		query_ctx->num_query_in_progress = MIN(num_query_in_progress,
+		                                       query_ctx->num_query - query_ctx->num_result);
+
 		for (size_t i = 0; i < query_ctx->num_query_in_progress; i++) {
 			query_ctx->query_param[i].ctx = query_ctx;
 			// We need a copy of id because the original may be overwritten
 			// by a completed query.
-			query_ctx->query_param[i].id = htonl(query_ctx->res[i].id);
+			query_ctx->query_param[i].id = htonl(query_ctx->res[query_ctx->num_result + i].id);
 			query_ctx->query_param[i].id_format = 1;
 			query_ctx->query_param[i].id_len = sizeof(query_ctx->query_param[i].id);
 			query_ctx->query_param[i].id_pointer = (const char *) &query_ctx->query_param[i].id;
@@ -264,6 +307,27 @@ error:
 	send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, query_ctx->req);
 }
 
+static void fetch_from_cache(uint64_t now,
+                             h2o_cache_t *world_cache,
+                             multiple_query_ctx_t *query_ctx)
+{
+	h2o_iovec_t key = {.len = sizeof(query_ctx->res->id)};
+
+	for (size_t i = 0; i < query_ctx->num_query; i++) {
+		key.base = (char *) &query_ctx->res[i].id;
+
+		h2o_cache_ref_t * const r = h2o_cache_fetch(world_cache, now, key, 0);
+
+		if (r) {
+			query_ctx->res[i].id = query_ctx->res[query_ctx->num_result].id;
+			memcpy(query_ctx->res + query_ctx->num_result++,
+			       r->value.base,
+			       sizeof(*query_ctx->res));
+			h2o_cache_release(world_cache, r);
+		}
+	}
+}
+
 static size_t get_query_number(h2o_req_t *req)
 {
 	int num_query = 0;
@@ -329,6 +393,23 @@ static result_return_t on_multiple_query_result(db_query_param_t *param, PGresul
 		                                                      query_ctx->req->conn->ctx);
 
 		process_result(result, query_ctx->res + query_ctx->num_result);
+
+		if (query_ctx->use_cache) {
+			query_result_t * const r = malloc(sizeof(*r));
+
+			if (r) {
+				const h2o_iovec_t key = {.base = (char *) &r->id, .len = sizeof(r->id)};
+				const h2o_iovec_t value = {.base = (char *) r, .len = sizeof(*r)};
+
+				*r = query_ctx->res[query_ctx->num_result];
+				h2o_cache_set(ctx->global_data->world_cache,
+				              h2o_now(ctx->event_loop.h2o_ctx.loop),
+				              key,
+				              0,
+				              value);
+			}
+		}
+
 		query_ctx->num_query_in_progress--;
 		query_ctx->num_result++;
 
@@ -348,21 +429,8 @@ static result_return_t on_multiple_query_result(db_query_param_t *param, PGresul
 			query_ctx->error = true;
 			send_service_unavailable_error(DB_REQ_ERROR, query_ctx->req);
 		}
-		else if (query_ctx->num_result == query_ctx->num_query) {
-			if (query_ctx->do_update)
-				do_updates(query_ctx);
-			else {
-				query_ctx->gen = get_json_generator(&ctx->json_generator, &ctx->json_generator_num);
-
-				if (query_ctx->gen)
-					serialize_items(query_ctx->res,
-					                query_ctx->num_result,
-					                &query_ctx->gen,
-					                query_ctx->req);
-				else
-					send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, query_ctx->req);
-			}
-		}
+		else if (query_ctx->num_result == query_ctx->num_query)
+			complete_multiple_query(query_ctx);
 
 		h2o_mem_release_shared(query_ctx);
 	}
@@ -488,13 +556,11 @@ static void process_result(PGresult *result, query_result_t *out)
 
 	const char * const id = PQgetvalue(result, 0, 0);
 	const char * const random_number = PQgetvalue(result, 0, 1);
-	const size_t id_len = PQgetlength(result, 0, 0);
-	const size_t random_number_len = PQgetlength(result, 0, 1);
 
-	assert(id && id_len && random_number && random_number_len);
+	assert(id && PQgetlength(result, 0, 0) && random_number && PQgetlength(result, 0, 1));
 
 	if (PQfformat(result, 0)) {
-		assert(id_len == sizeof(out->id));
+		assert(PQgetlength(result, 0, 0) == sizeof(out->id));
 		// Use memcpy() in case the result is not aligned; the reason we are
 		// copying over the id is because the results may arrive in any order.
 		memcpy(&out->id, id, sizeof(out->id));
@@ -508,7 +574,7 @@ static void process_result(PGresult *result, query_result_t *out)
 	assert(out->id <= MAX_ID);
 
 	if (PQfformat(result, 1)) {
-		assert(random_number_len == sizeof(out->random_number));
+		assert(PQgetlength(result, 0, 1) == sizeof(out->random_number));
 		// Use memcpy() in case the result is not aligned.
 		memcpy(&out->random_number, random_number, sizeof(out->random_number));
 		out->random_number = ntohl(out->random_number);
@@ -555,10 +621,16 @@ error_yajl:
 	send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, req);
 }
 
+int cached_queries(struct st_h2o_handler_t *self, h2o_req_t *req)
+{
+	IGNORE_FUNCTION_PARAMETER(self);
+	return do_multiple_queries(false, true, req);
+}
+
 int multiple_queries(struct st_h2o_handler_t *self, h2o_req_t *req)
 {
 	IGNORE_FUNCTION_PARAMETER(self);
-	return do_multiple_queries(false, req);
+	return do_multiple_queries(false, false, req);
 }
 
 int single_query(struct st_h2o_handler_t *self, h2o_req_t *req)
@@ -600,5 +672,5 @@ int single_query(struct st_h2o_handler_t *self, h2o_req_t *req)
 int updates(struct st_h2o_handler_t *self, h2o_req_t *req)
 {
 	IGNORE_FUNCTION_PARAMETER(self);
-	return do_multiple_queries(true, req);
+	return do_multiple_queries(true, false, req);
 }

+ 1 - 0
frameworks/C/h2o/src/world.h

@@ -23,6 +23,7 @@
 
 #include <h2o.h>
 
+int cached_queries(struct st_h2o_handler_t *self, h2o_req_t *req);
 int multiple_queries(struct st_h2o_handler_t *self, h2o_req_t *req);
 int single_query(struct st_h2o_handler_t *self, h2o_req_t *req);
 int updates(struct st_h2o_handler_t *self, h2o_req_t *req);