|
@@ -64,6 +64,8 @@
|
|
|
|
|
|
#define USE_CACHE 2
|
|
#define USE_CACHE 2
|
|
#define WORLD_TABLE_NAME "World"
|
|
#define WORLD_TABLE_NAME "World"
|
|
|
|
+#define POPULATE_CACHE_QUERY "SELECT * FROM " WORLD_TABLE_NAME ";"
|
|
|
|
+#define WORLD_QUERY "SELECT * FROM " WORLD_TABLE_NAME " WHERE id = $1::integer;"
|
|
|
|
|
|
typedef struct multiple_query_ctx_t multiple_query_ctx_t;
|
|
typedef struct multiple_query_ctx_t multiple_query_ctx_t;
|
|
typedef struct update_ctx_t update_ctx_t;
|
|
typedef struct update_ctx_t update_ctx_t;
|
|
@@ -73,6 +75,11 @@ typedef struct {
|
|
uint32_t random_number;
|
|
uint32_t random_number;
|
|
} query_result_t;
|
|
} query_result_t;
|
|
|
|
|
|
|
|
+typedef struct {
|
|
|
|
+ thread_context_t *ctx;
|
|
|
|
+ db_query_param_t param;
|
|
|
|
+} populate_cache_ctx_t;
|
|
|
|
+
|
|
typedef struct {
|
|
typedef struct {
|
|
multiple_query_ctx_t *ctx;
|
|
multiple_query_ctx_t *ctx;
|
|
const char *id_pointer;
|
|
const char *id_pointer;
|
|
@@ -113,21 +120,23 @@ static int compare_items(const void *x, const void *y);
|
|
static void complete_multiple_query(multiple_query_ctx_t *query_ctx);
|
|
static void complete_multiple_query(multiple_query_ctx_t *query_ctx);
|
|
static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req);
|
|
static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req);
|
|
static void do_updates(multiple_query_ctx_t *query_ctx);
|
|
static void do_updates(multiple_query_ctx_t *query_ctx);
|
|
-static void fetch_from_cache(uint64_t now,
|
|
|
|
- h2o_cache_t *world_cache,
|
|
|
|
- multiple_query_ctx_t *query_ctx);
|
|
|
|
-static void free_world_cache_entry(h2o_iovec_t value);
|
|
|
|
|
|
+static void fetch_from_cache(uint64_t now, h2o_cache_t *cache, multiple_query_ctx_t *query_ctx);
|
|
|
|
+static void free_cache_entry(h2o_iovec_t value);
|
|
static size_t get_query_number(h2o_req_t *req);
|
|
static size_t get_query_number(h2o_req_t *req);
|
|
static void initialize_ids(size_t num_query, query_result_t *res, unsigned int *seed);
|
|
static void initialize_ids(size_t num_query, query_result_t *res, unsigned int *seed);
|
|
static int multiple_queries(struct st_h2o_handler_t *self, h2o_req_t *req);
|
|
static int multiple_queries(struct st_h2o_handler_t *self, h2o_req_t *req);
|
|
static void on_multiple_query_error(db_query_param_t *param, const char *error_string);
|
|
static void on_multiple_query_error(db_query_param_t *param, const char *error_string);
|
|
static result_return_t on_multiple_query_result(db_query_param_t *param, PGresult *result);
|
|
static result_return_t on_multiple_query_result(db_query_param_t *param, PGresult *result);
|
|
static void on_multiple_query_timeout(db_query_param_t *param);
|
|
static void on_multiple_query_timeout(db_query_param_t *param);
|
|
|
|
+static void on_populate_cache_error(db_query_param_t *param, const char *error_string);
|
|
|
|
+static result_return_t on_populate_cache_result(db_query_param_t *param, PGresult *result);
|
|
|
|
+static void on_populate_cache_timeout(db_query_param_t *param);
|
|
static void on_single_query_error(db_query_param_t *param, const char *error_string);
|
|
static void on_single_query_error(db_query_param_t *param, const char *error_string);
|
|
static result_return_t on_single_query_result(db_query_param_t *param, PGresult *result);
|
|
static result_return_t on_single_query_result(db_query_param_t *param, PGresult *result);
|
|
static void on_single_query_timeout(db_query_param_t *param);
|
|
static void on_single_query_timeout(db_query_param_t *param);
|
|
static result_return_t on_update_result(db_query_param_t *param, PGresult *result);
|
|
static result_return_t on_update_result(db_query_param_t *param, PGresult *result);
|
|
-static void process_result(PGresult *result, query_result_t *out);
|
|
|
|
|
|
+static void populate_cache(thread_context_t *ctx);
|
|
|
|
+static void process_result(PGresult *result, size_t idx, query_result_t *out);
|
|
static int serialize_item(uint32_t id, uint32_t random_number, yajl_gen gen);
|
|
static int serialize_item(uint32_t id, uint32_t random_number, yajl_gen gen);
|
|
static void serialize_items(const query_result_t *res,
|
|
static void serialize_items(const query_result_t *res,
|
|
size_t num_result,
|
|
size_t num_result,
|
|
@@ -258,6 +267,9 @@ static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req)
|
|
query_ctx->flags |= DO_UPDATE;
|
|
query_ctx->flags |= DO_UPDATE;
|
|
|
|
|
|
if (use_cache) {
|
|
if (use_cache) {
|
|
|
|
+ if (ctx->request_handler_data.populate_world_cache)
|
|
|
|
+ populate_cache(ctx);
|
|
|
|
+
|
|
query_ctx->flags |= USE_CACHE;
|
|
query_ctx->flags |= USE_CACHE;
|
|
fetch_from_cache(h2o_now(ctx->event_loop.h2o_ctx.loop),
|
|
fetch_from_cache(h2o_now(ctx->event_loop.h2o_ctx.loop),
|
|
ctx->request_handler_data.world_cache,
|
|
ctx->request_handler_data.world_cache,
|
|
@@ -368,28 +380,26 @@ error:
|
|
send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, query_ctx->req);
|
|
send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, query_ctx->req);
|
|
}
|
|
}
|
|
|
|
|
|
-static void fetch_from_cache(uint64_t now,
|
|
|
|
- h2o_cache_t *world_cache,
|
|
|
|
- multiple_query_ctx_t *query_ctx)
|
|
|
|
|
|
+static void fetch_from_cache(uint64_t now, h2o_cache_t *cache, multiple_query_ctx_t *query_ctx)
|
|
{
|
|
{
|
|
h2o_iovec_t key = {.len = sizeof(query_ctx->res->id)};
|
|
h2o_iovec_t key = {.len = sizeof(query_ctx->res->id)};
|
|
|
|
|
|
for (size_t i = 0; i < query_ctx->num_query; i++) {
|
|
for (size_t i = 0; i < query_ctx->num_query; i++) {
|
|
key.base = (char *) &query_ctx->res[i].id;
|
|
key.base = (char *) &query_ctx->res[i].id;
|
|
|
|
|
|
- h2o_cache_ref_t * const r = h2o_cache_fetch(world_cache, now, key, 0);
|
|
|
|
|
|
+ h2o_cache_ref_t * const r = h2o_cache_fetch(cache, now, key, 0);
|
|
|
|
|
|
if (r) {
|
|
if (r) {
|
|
query_ctx->res[i].id = query_ctx->res[query_ctx->num_result].id;
|
|
query_ctx->res[i].id = query_ctx->res[query_ctx->num_result].id;
|
|
memcpy(query_ctx->res + query_ctx->num_result++,
|
|
memcpy(query_ctx->res + query_ctx->num_result++,
|
|
r->value.base,
|
|
r->value.base,
|
|
sizeof(*query_ctx->res));
|
|
sizeof(*query_ctx->res));
|
|
- h2o_cache_release(world_cache, r);
|
|
|
|
|
|
+ h2o_cache_release(cache, r);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-static void free_world_cache_entry(h2o_iovec_t value)
|
|
|
|
|
|
+static void free_cache_entry(h2o_iovec_t value)
|
|
{
|
|
{
|
|
free(value.base);
|
|
free(value.base);
|
|
}
|
|
}
|
|
@@ -469,7 +479,8 @@ static result_return_t on_multiple_query_result(db_query_param_t *param, PGresul
|
|
cleanup_multiple_query(query_ctx);
|
|
cleanup_multiple_query(query_ctx);
|
|
}
|
|
}
|
|
else if (PQresultStatus(result) == PGRES_TUPLES_OK) {
|
|
else if (PQresultStatus(result) == PGRES_TUPLES_OK) {
|
|
- process_result(result, query_ctx->res + query_ctx->num_result);
|
|
|
|
|
|
+ assert(PQntuples(result) == 1);
|
|
|
|
+ process_result(result, 0, query_ctx->res + query_ctx->num_result);
|
|
|
|
|
|
if (query_ctx->flags & USE_CACHE) {
|
|
if (query_ctx->flags & USE_CACHE) {
|
|
query_result_t * const r = malloc(sizeof(*r));
|
|
query_result_t * const r = malloc(sizeof(*r));
|
|
@@ -533,6 +544,51 @@ static void on_multiple_query_timeout(db_query_param_t *param)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+static void on_populate_cache_error(db_query_param_t *param, const char *error_string)
|
|
|
|
+{
|
|
|
|
+ IGNORE_FUNCTION_PARAMETER(error_string);
|
|
|
|
+ free(H2O_STRUCT_FROM_MEMBER(populate_cache_ctx_t, param, param));
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+static result_return_t on_populate_cache_result(db_query_param_t *param, PGresult *result)
|
|
|
|
+{
|
|
|
|
+ populate_cache_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(populate_cache_ctx_t,
|
|
|
|
+ param,
|
|
|
|
+ param);
|
|
|
|
+
|
|
|
|
+ if (PQresultStatus(result) == PGRES_TUPLES_OK) {
|
|
|
|
+ const size_t num_rows = PQntuples(result);
|
|
|
|
+
|
|
|
|
+ for (size_t i = 0; i < num_rows; i++) {
|
|
|
|
+ query_result_t * const r = calloc(1, sizeof(*r));
|
|
|
|
+
|
|
|
|
+ if (r) {
|
|
|
|
+ process_result(result, i, r);
|
|
|
|
+
|
|
|
|
+ const h2o_iovec_t key = {.base = (char *) &r->id, .len = sizeof(r->id)};
|
|
|
|
+ const h2o_iovec_t value = {.base = (char *) r, .len = sizeof(*r)};
|
|
|
|
+
|
|
|
|
+ h2o_cache_set(query_ctx->ctx->request_handler_data.world_cache,
|
|
|
|
+ h2o_now(query_ctx->ctx->event_loop.h2o_ctx.loop),
|
|
|
|
+ key,
|
|
|
|
+ 0,
|
|
|
|
+ value);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
|
|
|
|
+
|
|
|
|
+ PQclear(result);
|
|
|
|
+ free(query_ctx);
|
|
|
|
+ return DONE;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+static void on_populate_cache_timeout(db_query_param_t *param)
|
|
|
|
+{
|
|
|
|
+ free(H2O_STRUCT_FROM_MEMBER(populate_cache_ctx_t, param, param));
|
|
|
|
+}
|
|
|
|
+
|
|
static void on_single_query_error(db_query_param_t *param, const char *error_string)
|
|
static void on_single_query_error(db_query_param_t *param, const char *error_string)
|
|
{
|
|
{
|
|
single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t, param, param);
|
|
single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t, param, param);
|
|
@@ -637,18 +693,36 @@ static result_return_t on_update_result(db_query_param_t *param, PGresult *resul
|
|
return DONE;
|
|
return DONE;
|
|
}
|
|
}
|
|
|
|
|
|
-static void process_result(PGresult *result, query_result_t *out)
|
|
|
|
|
|
+static void populate_cache(thread_context_t *ctx)
|
|
|
|
+{
|
|
|
|
+ populate_cache_ctx_t * const query_ctx = calloc(1, sizeof(*query_ctx));
|
|
|
|
+
|
|
|
|
+ if (query_ctx) {
|
|
|
|
+ query_ctx->ctx = ctx;
|
|
|
|
+ query_ctx->param.command = POPULATE_CACHE_QUERY;
|
|
|
|
+ query_ctx->param.on_error = on_populate_cache_error;
|
|
|
|
+ query_ctx->param.on_result = on_populate_cache_result;
|
|
|
|
+ query_ctx->param.on_timeout = on_populate_cache_timeout;
|
|
|
|
+
|
|
|
|
+ if (execute_query(ctx, &query_ctx->param))
|
|
|
|
+ free(query_ctx);
|
|
|
|
+ else
|
|
|
|
+ ctx->request_handler_data.populate_world_cache = false;
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+static void process_result(PGresult *result, size_t idx, query_result_t *out)
|
|
{
|
|
{
|
|
assert(PQnfields(result) == 2);
|
|
assert(PQnfields(result) == 2);
|
|
- assert(PQntuples(result) == 1);
|
|
|
|
|
|
+ assert((size_t) PQntuples(result) > idx);
|
|
|
|
|
|
- const char * const id = PQgetvalue(result, 0, 0);
|
|
|
|
- const char * const random_number = PQgetvalue(result, 0, 1);
|
|
|
|
|
|
+ const char * const id = PQgetvalue(result, idx, 0);
|
|
|
|
+ const char * const random_number = PQgetvalue(result, idx, 1);
|
|
|
|
|
|
- assert(id && PQgetlength(result, 0, 0) && random_number && PQgetlength(result, 0, 1));
|
|
|
|
|
|
+ assert(id && PQgetlength(result, idx, 0) && random_number && PQgetlength(result, idx, 1));
|
|
|
|
|
|
if (PQfformat(result, 0)) {
|
|
if (PQfformat(result, 0)) {
|
|
- assert(PQgetlength(result, 0, 0) == sizeof(out->id));
|
|
|
|
|
|
+ assert(PQgetlength(result, idx, 0) == sizeof(out->id));
|
|
// Use memcpy() in case the result is not aligned; the reason we are
|
|
// Use memcpy() in case the result is not aligned; the reason we are
|
|
// copying over the id is because the results may arrive in any order.
|
|
// copying over the id is because the results may arrive in any order.
|
|
memcpy(&out->id, id, sizeof(out->id));
|
|
memcpy(&out->id, id, sizeof(out->id));
|
|
@@ -662,7 +736,7 @@ static void process_result(PGresult *result, query_result_t *out)
|
|
assert(out->id <= MAX_ID);
|
|
assert(out->id <= MAX_ID);
|
|
|
|
|
|
if (PQfformat(result, 1)) {
|
|
if (PQfformat(result, 1)) {
|
|
- assert(PQgetlength(result, 0, 1) == sizeof(out->random_number));
|
|
|
|
|
|
+ assert(PQgetlength(result, idx, 1) == sizeof(out->random_number));
|
|
// Use memcpy() in case the result is not aligned.
|
|
// Use memcpy() in case the result is not aligned.
|
|
memcpy(&out->random_number, random_number, sizeof(out->random_number));
|
|
memcpy(&out->random_number, random_number, sizeof(out->random_number));
|
|
out->random_number = ntohl(out->random_number);
|
|
out->random_number = ntohl(out->random_number);
|
|
@@ -766,10 +840,11 @@ void free_world_handler_thread_data(request_handler_thread_data_t *request_handl
|
|
|
|
|
|
void initialize_world_handler_thread_data(request_handler_thread_data_t *request_handler_data)
|
|
void initialize_world_handler_thread_data(request_handler_thread_data_t *request_handler_data)
|
|
{
|
|
{
|
|
|
|
+ request_handler_data->populate_world_cache = true;
|
|
request_handler_data->world_cache = h2o_cache_create(0,
|
|
request_handler_data->world_cache = h2o_cache_create(0,
|
|
CACHE_CAPACITY,
|
|
CACHE_CAPACITY,
|
|
CACHE_DURATION,
|
|
CACHE_DURATION,
|
|
- free_world_cache_entry);
|
|
|
|
|
|
+ free_cache_entry);
|
|
|
|
|
|
if (!request_handler_data->world_cache)
|
|
if (!request_handler_data->world_cache)
|
|
abort();
|
|
abort();
|
|
@@ -779,9 +854,7 @@ void initialize_world_handlers(global_data_t *global_data,
|
|
h2o_hostconf_t *hostconf,
|
|
h2o_hostconf_t *hostconf,
|
|
h2o_access_log_filehandle_t *log_handle)
|
|
h2o_access_log_filehandle_t *log_handle)
|
|
{
|
|
{
|
|
- add_prepared_statement(WORLD_TABLE_NAME,
|
|
|
|
- "SELECT * FROM " WORLD_TABLE_NAME " WHERE id = $1::integer;",
|
|
|
|
- &global_data->prepared_statements);
|
|
|
|
|
|
+ add_prepared_statement(WORLD_TABLE_NAME, WORLD_QUERY, &global_data->prepared_statements);
|
|
register_request_handler("/cached-worlds", cached_queries, hostconf, log_handle);
|
|
register_request_handler("/cached-worlds", cached_queries, hostconf, log_handle);
|
|
register_request_handler("/db", single_query, hostconf, log_handle);
|
|
register_request_handler("/db", single_query, hostconf, log_handle);
|
|
register_request_handler("/queries", multiple_queries, hostconf, log_handle);
|
|
register_request_handler("/queries", multiple_queries, hostconf, log_handle);
|