Browse Source

H2O: Prepopulate the caches

Anton Kirilov 6 years ago
parent
commit
b6d196282b

+ 2 - 1
frameworks/C/h2o/src/handlers/fortune.c

@@ -41,6 +41,7 @@
 
 #define ID_FIELD_NAME "id"
 #define FORTUNE_TABLE_NAME "Fortune"
+#define FORTUNE_QUERY "SELECT * FROM " FORTUNE_TABLE_NAME ";"
 #define MAX_IOVEC 64
 #define MESSAGE_FIELD_NAME "message"
 #define NEW_FORTUNE_ID "0"
@@ -471,7 +472,7 @@ void initialize_fortunes_handler(const config_t *config,
 	if (template) {
 		global_data->request_handler_data.fortunes_template = template;
 		add_prepared_statement(FORTUNE_TABLE_NAME,
-		                       "SELECT * FROM " FORTUNE_TABLE_NAME ";",
+		                       FORTUNE_QUERY,
 		                       &global_data->prepared_statements);
 		register_request_handler("/fortunes", fortunes, hostconf, log_handle);
 	}

+ 2 - 0
frameworks/C/h2o/src/handlers/request_handler_data.h

@@ -22,6 +22,7 @@
 #define REQUEST_HANDLER_DATA_H_
 
 #include <h2o.h>
+#include <stdbool.h>
 
 struct mustache_token_t;
 
@@ -31,6 +32,7 @@ typedef struct {
 
 typedef struct {
 	h2o_cache_t *world_cache;
+	bool populate_world_cache;
 } request_handler_thread_data_t;
 
 #endif // REQUEST_HANDLER_DATA_H_

+ 96 - 23
frameworks/C/h2o/src/handlers/world.c

@@ -64,6 +64,8 @@
 
 #define USE_CACHE 2
 #define WORLD_TABLE_NAME "World"
+#define POPULATE_CACHE_QUERY "SELECT * FROM " WORLD_TABLE_NAME ";"
+#define WORLD_QUERY "SELECT * FROM " WORLD_TABLE_NAME " WHERE id = $1::integer;"
 
 typedef struct multiple_query_ctx_t multiple_query_ctx_t;
 typedef struct update_ctx_t update_ctx_t;
@@ -73,6 +75,11 @@ typedef struct {
 	uint32_t random_number;
 } query_result_t;
 
+typedef struct {
+	thread_context_t *ctx;
+	db_query_param_t param;
+} populate_cache_ctx_t;
+
 typedef struct {
 	multiple_query_ctx_t *ctx;
 	const char *id_pointer;
@@ -113,21 +120,23 @@ static int compare_items(const void *x, const void *y);
 static void complete_multiple_query(multiple_query_ctx_t *query_ctx);
 static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req);
 static void do_updates(multiple_query_ctx_t *query_ctx);
-static void fetch_from_cache(uint64_t now,
-                             h2o_cache_t *world_cache,
-                             multiple_query_ctx_t *query_ctx);
-static void free_world_cache_entry(h2o_iovec_t value);
+static void fetch_from_cache(uint64_t now, h2o_cache_t *cache, multiple_query_ctx_t *query_ctx);
+static void free_cache_entry(h2o_iovec_t value);
 static size_t get_query_number(h2o_req_t *req);
 static void initialize_ids(size_t num_query, query_result_t *res, unsigned int *seed);
 static int multiple_queries(struct st_h2o_handler_t *self, h2o_req_t *req);
 static void on_multiple_query_error(db_query_param_t *param, const char *error_string);
 static result_return_t on_multiple_query_result(db_query_param_t *param, PGresult *result);
 static void on_multiple_query_timeout(db_query_param_t *param);
+static void on_populate_cache_error(db_query_param_t *param, const char *error_string);
+static result_return_t on_populate_cache_result(db_query_param_t *param, PGresult *result);
+static void on_populate_cache_timeout(db_query_param_t *param);
 static void on_single_query_error(db_query_param_t *param, const char *error_string);
 static result_return_t on_single_query_result(db_query_param_t *param, PGresult *result);
 static void on_single_query_timeout(db_query_param_t *param);
 static result_return_t on_update_result(db_query_param_t *param, PGresult *result);
-static void process_result(PGresult *result, query_result_t *out);
+static void populate_cache(thread_context_t *ctx);
+static void process_result(PGresult *result, size_t idx, query_result_t *out);
 static int serialize_item(uint32_t id, uint32_t random_number, yajl_gen gen);
 static void serialize_items(const query_result_t *res,
                             size_t num_result,
@@ -258,6 +267,9 @@ static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req)
 			query_ctx->flags |= DO_UPDATE;
 
 		if (use_cache) {
+			if (ctx->request_handler_data.populate_world_cache)
+				populate_cache(ctx);
+
 			query_ctx->flags |= USE_CACHE;
 			fetch_from_cache(h2o_now(ctx->event_loop.h2o_ctx.loop),
 			                 ctx->request_handler_data.world_cache,
@@ -368,28 +380,26 @@ error:
 	send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, query_ctx->req);
 }
 
-static void fetch_from_cache(uint64_t now,
-                             h2o_cache_t *world_cache,
-                             multiple_query_ctx_t *query_ctx)
+static void fetch_from_cache(uint64_t now, h2o_cache_t *cache, multiple_query_ctx_t *query_ctx)
 {
 	h2o_iovec_t key = {.len = sizeof(query_ctx->res->id)};
 
 	for (size_t i = 0; i < query_ctx->num_query; i++) {
 		key.base = (char *) &query_ctx->res[i].id;
 
-		h2o_cache_ref_t * const r = h2o_cache_fetch(world_cache, now, key, 0);
+		h2o_cache_ref_t * const r = h2o_cache_fetch(cache, now, key, 0);
 
 		if (r) {
 			query_ctx->res[i].id = query_ctx->res[query_ctx->num_result].id;
 			memcpy(query_ctx->res + query_ctx->num_result++,
 			       r->value.base,
 			       sizeof(*query_ctx->res));
-			h2o_cache_release(world_cache, r);
+			h2o_cache_release(cache, r);
 		}
 	}
 }
 
-static void free_world_cache_entry(h2o_iovec_t value)
+static void free_cache_entry(h2o_iovec_t value)
 {
 	free(value.base);
 }
@@ -469,7 +479,8 @@ static result_return_t on_multiple_query_result(db_query_param_t *param, PGresul
 			cleanup_multiple_query(query_ctx);
 	}
 	else if (PQresultStatus(result) == PGRES_TUPLES_OK) {
-		process_result(result, query_ctx->res + query_ctx->num_result);
+		assert(PQntuples(result) == 1);
+		process_result(result, 0, query_ctx->res + query_ctx->num_result);
 
 		if (query_ctx->flags & USE_CACHE) {
 			query_result_t * const r = malloc(sizeof(*r));
@@ -533,6 +544,51 @@ static void on_multiple_query_timeout(db_query_param_t *param)
 	}
 }
 
+static void on_populate_cache_error(db_query_param_t *param, const char *error_string)
+{
+	IGNORE_FUNCTION_PARAMETER(error_string);
+	free(H2O_STRUCT_FROM_MEMBER(populate_cache_ctx_t, param, param));
+}
+
+static result_return_t on_populate_cache_result(db_query_param_t *param, PGresult *result)
+{
+	populate_cache_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(populate_cache_ctx_t,
+	                                                                param,
+	                                                                param);
+
+	if (PQresultStatus(result) == PGRES_TUPLES_OK) {
+		const size_t num_rows = PQntuples(result);
+
+		for (size_t i = 0; i < num_rows; i++) {
+			query_result_t * const r = calloc(1, sizeof(*r));
+
+			if (r) {
+				process_result(result, i, r);
+
+				const h2o_iovec_t key = {.base = (char *) &r->id, .len = sizeof(r->id)};
+				const h2o_iovec_t value = {.base = (char *) r, .len = sizeof(*r)};
+
+				h2o_cache_set(query_ctx->ctx->request_handler_data.world_cache,
+				              h2o_now(query_ctx->ctx->event_loop.h2o_ctx.loop),
+				              key,
+				              0,
+				              value);
+			}
+		}
+	}
+	else
+		LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
+
+	PQclear(result);
+	free(query_ctx);
+	return DONE;
+}
+
+static void on_populate_cache_timeout(db_query_param_t *param)
+{
+	free(H2O_STRUCT_FROM_MEMBER(populate_cache_ctx_t, param, param));
+}
+
 static void on_single_query_error(db_query_param_t *param, const char *error_string)
 {
 	single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t, param, param);
@@ -637,18 +693,36 @@ static result_return_t on_update_result(db_query_param_t *param, PGresult *resul
 	return DONE;
 }
 
-static void process_result(PGresult *result, query_result_t *out)
+static void populate_cache(thread_context_t *ctx)
+{
+	populate_cache_ctx_t * const query_ctx = calloc(1, sizeof(*query_ctx));
+
+	if (query_ctx) {
+		query_ctx->ctx = ctx;
+		query_ctx->param.command = POPULATE_CACHE_QUERY;
+		query_ctx->param.on_error = on_populate_cache_error;
+		query_ctx->param.on_result = on_populate_cache_result;
+		query_ctx->param.on_timeout = on_populate_cache_timeout;
+
+		if (execute_query(ctx, &query_ctx->param))
+			free(query_ctx);
+		else
+			ctx->request_handler_data.populate_world_cache = false;
+	}
+}
+
+static void process_result(PGresult *result, size_t idx, query_result_t *out)
 {
 	assert(PQnfields(result) == 2);
-	assert(PQntuples(result) == 1);
+	assert((size_t) PQntuples(result) > idx);
 
-	const char * const id = PQgetvalue(result, 0, 0);
-	const char * const random_number = PQgetvalue(result, 0, 1);
+	const char * const id = PQgetvalue(result, idx, 0);
+	const char * const random_number = PQgetvalue(result, idx, 1);
 
-	assert(id && PQgetlength(result, 0, 0) && random_number && PQgetlength(result, 0, 1));
+	assert(id && PQgetlength(result, idx, 0) && random_number && PQgetlength(result, idx, 1));
 
 	if (PQfformat(result, 0)) {
-		assert(PQgetlength(result, 0, 0) == sizeof(out->id));
+		assert(PQgetlength(result, idx, 0) == sizeof(out->id));
 		// Use memcpy() in case the result is not aligned; the reason we are
 		// copying over the id is because the results may arrive in any order.
 		memcpy(&out->id, id, sizeof(out->id));
@@ -662,7 +736,7 @@ static void process_result(PGresult *result, query_result_t *out)
 	assert(out->id <= MAX_ID);
 
 	if (PQfformat(result, 1)) {
-		assert(PQgetlength(result, 0, 1) == sizeof(out->random_number));
+		assert(PQgetlength(result, idx, 1) == sizeof(out->random_number));
 		// Use memcpy() in case the result is not aligned.
 		memcpy(&out->random_number, random_number, sizeof(out->random_number));
 		out->random_number = ntohl(out->random_number);
@@ -766,10 +840,11 @@ void free_world_handler_thread_data(request_handler_thread_data_t *request_handl
 
 void initialize_world_handler_thread_data(request_handler_thread_data_t *request_handler_data)
 {
+	request_handler_data->populate_world_cache = true;
 	request_handler_data->world_cache = h2o_cache_create(0,
 	                                                     CACHE_CAPACITY,
 	                                                     CACHE_DURATION,
-	                                                     free_world_cache_entry);
+	                                                     free_cache_entry);
 
 	if (!request_handler_data->world_cache)
 		abort();
@@ -779,9 +854,7 @@ void initialize_world_handlers(global_data_t *global_data,
                                h2o_hostconf_t *hostconf,
                                h2o_access_log_filehandle_t *log_handle)
 {
-	add_prepared_statement(WORLD_TABLE_NAME,
-	                       "SELECT * FROM " WORLD_TABLE_NAME " WHERE id = $1::integer;",
-	                       &global_data->prepared_statements);
+	add_prepared_statement(WORLD_TABLE_NAME, WORLD_QUERY, &global_data->prepared_statements);
 	register_request_handler("/cached-worlds", cached_queries, hostconf, log_handle);
 	register_request_handler("/db", single_query, hostconf, log_handle);
 	register_request_handler("/queries", multiple_queries, hostconf, log_handle);

+ 1 - 0
frameworks/C/h2o/src/thread.c

@@ -35,6 +35,7 @@
 #include "error.h"
 #include "event_loop.h"
 #include "global_data.h"
+#include "request_handler.h"
 #include "thread.h"
 #include "utility.h"