|
@@ -67,23 +67,28 @@ typedef struct {
|
|
|
uint32_t id;
|
|
|
int id_format;
|
|
|
int id_len;
|
|
|
+ bool cleanup;
|
|
|
db_query_param_t param;
|
|
|
} single_query_ctx_t;
|
|
|
|
|
|
struct multiple_query_ctx_t {
|
|
|
+ thread_context_t *ctx;
|
|
|
json_generator_t *gen;
|
|
|
h2o_req_t *req;
|
|
|
query_param_t *query_param;
|
|
|
size_t num_query;
|
|
|
size_t num_query_in_progress;
|
|
|
size_t num_result;
|
|
|
+ bool cleanup;
|
|
|
bool do_update;
|
|
|
- bool error;
|
|
|
bool use_cache;
|
|
|
query_result_t res[];
|
|
|
};
|
|
|
|
|
|
-static void cleanup_multiple_query(void *data);
|
|
|
+static void cleanup_multiple_query(multiple_query_ctx_t *query_ctx);
|
|
|
+static void cleanup_multiple_query_request(void *data);
|
|
|
+static void cleanup_single_query(single_query_ctx_t *query_ctx);
|
|
|
+static void cleanup_single_query_request(void *data);
|
|
|
static int compare_items(const void *x, const void *y);
|
|
|
static void complete_multiple_query(multiple_query_ctx_t *query_ctx);
|
|
|
static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req);
|
|
@@ -105,20 +110,43 @@ static void serialize_items(const query_result_t *res,
|
|
|
json_generator_t **gen,
|
|
|
h2o_req_t *req);
|
|
|
|
|
|
-static void cleanup_multiple_query(void *data)
|
|
|
+static void cleanup_multiple_query(multiple_query_ctx_t *query_ctx)
|
|
|
{
|
|
|
- const multiple_query_ctx_t * const query_ctx = data;
|
|
|
+ if (query_ctx->gen)
|
|
|
+ free_json_generator(query_ctx->gen,
|
|
|
+ &query_ctx->ctx->json_generator,
|
|
|
+ &query_ctx->ctx->json_generator_num,
|
|
|
+ query_ctx->ctx->config->max_json_generator);
|
|
|
|
|
|
- if (query_ctx->gen) {
|
|
|
- thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
- event_loop.h2o_ctx,
|
|
|
- query_ctx->req->conn->ctx);
|
|
|
+ free(query_ctx);
|
|
|
+}
|
|
|
|
|
|
- free_json_generator(query_ctx->gen,
|
|
|
- &ctx->json_generator,
|
|
|
- &ctx->json_generator_num,
|
|
|
- ctx->config->max_json_generator);
|
|
|
+
|
|
|
+static void cleanup_multiple_query_request(void *data)
|
|
|
+{
|
|
|
+ multiple_query_ctx_t * const query_ctx = *(multiple_query_ctx_t **) data;
|
|
|
+
|
|
|
+ if (query_ctx->cleanup) {
|
|
|
+ if (!query_ctx->num_query_in_progress)
|
|
|
+ cleanup_multiple_query(query_ctx);
|
|
|
}
|
|
|
+ else
|
|
|
+ query_ctx->cleanup = true;
|
|
|
+}
|
|
|
+
|
|
|
+static void cleanup_single_query(single_query_ctx_t *query_ctx)
|
|
|
+{
|
|
|
+ free(query_ctx);
|
|
|
+}
|
|
|
+
|
|
|
+static void cleanup_single_query_request(void *data)
|
|
|
+{
|
|
|
+ single_query_ctx_t * const query_ctx = *(single_query_ctx_t **) data;
|
|
|
+
|
|
|
+ if (query_ctx->cleanup)
|
|
|
+ cleanup_single_query(query_ctx);
|
|
|
+ else
|
|
|
+ query_ctx->cleanup = true;
|
|
|
}
|
|
|
|
|
|
static int compare_items(const void *x, const void *y)
|
|
@@ -136,11 +164,9 @@ static void complete_multiple_query(multiple_query_ctx_t *query_ctx)
|
|
|
if (query_ctx->do_update)
|
|
|
do_updates(query_ctx);
|
|
|
else {
|
|
|
- thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
- event_loop.h2o_ctx,
|
|
|
- query_ctx->req->conn->ctx);
|
|
|
-
|
|
|
- query_ctx->gen = get_json_generator(&ctx->json_generator, &ctx->json_generator_num);
|
|
|
+ query_ctx->cleanup = true;
|
|
|
+ query_ctx->gen = get_json_generator(&query_ctx->ctx->json_generator,
|
|
|
+ &query_ctx->ctx->json_generator_num);
|
|
|
|
|
|
if (query_ctx->gen)
|
|
|
serialize_items(query_ctx->res,
|
|
@@ -180,12 +206,15 @@ static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req)
|
|
|
sz += update_query_len - reuse_size;
|
|
|
}
|
|
|
|
|
|
- multiple_query_ctx_t * const query_ctx = h2o_mem_alloc_shared(&req->pool,
|
|
|
- sz,
|
|
|
- cleanup_multiple_query);
|
|
|
+ multiple_query_ctx_t * const query_ctx = calloc(1, sz);
|
|
|
|
|
|
if (query_ctx) {
|
|
|
- memset(query_ctx, 0, sz);
|
|
|
+ multiple_query_ctx_t ** const p = h2o_mem_alloc_shared(&req->pool,
|
|
|
+ sizeof(*p),
|
|
|
+ cleanup_multiple_query_request);
|
|
|
+
|
|
|
+ *p = query_ctx;
|
|
|
+ query_ctx->ctx = ctx;
|
|
|
query_ctx->num_query = num_query;
|
|
|
query_ctx->req = req;
|
|
|
query_ctx->do_update = do_update;
|
|
@@ -228,12 +257,10 @@ static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req)
|
|
|
|
|
|
if (execute_query(ctx, &query_ctx->query_param[i].param)) {
|
|
|
query_ctx->num_query_in_progress = i;
|
|
|
- query_ctx->error = true;
|
|
|
+ query_ctx->cleanup = true;
|
|
|
send_service_unavailable_error(DB_REQ_ERROR, req);
|
|
|
return 0;
|
|
|
}
|
|
|
-
|
|
|
- h2o_mem_addref_shared(query_ctx);
|
|
|
}
|
|
|
}
|
|
|
else
|
|
@@ -244,9 +271,6 @@ static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req)
|
|
|
|
|
|
static void do_updates(multiple_query_ctx_t *query_ctx)
|
|
|
{
|
|
|
- thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
- event_loop.h2o_ctx,
|
|
|
- query_ctx->req->conn->ctx);
|
|
|
char *iter = (char *) (query_ctx->query_param + 1);
|
|
|
size_t sz = MAX_UPDATE_QUERY_LEN(query_ctx->num_result);
|
|
|
|
|
@@ -259,7 +283,7 @@ static void do_updates(multiple_query_ctx_t *query_ctx)
|
|
|
query_ctx->query_param->param.paramLengths = NULL;
|
|
|
query_ctx->query_param->param.paramValues = NULL;
|
|
|
query_ctx->query_param->param.flags = 0;
|
|
|
- query_ctx->res->random_number = get_random_number(MAX_ID, &ctx->random_seed) + 1;
|
|
|
+ query_ctx->res->random_number = 1 + get_random_number(MAX_ID, &query_ctx->ctx->random_seed);
|
|
|
|
|
|
int c = snprintf(iter,
|
|
|
sz,
|
|
@@ -274,7 +298,8 @@ static void do_updates(multiple_query_ctx_t *query_ctx)
|
|
|
sz -= c;
|
|
|
|
|
|
for (size_t i = 1; i < query_ctx->num_result; i++) {
|
|
|
- query_ctx->res[i].random_number = get_random_number(MAX_ID, &ctx->random_seed) + 1;
|
|
|
+ query_ctx->res[i].random_number = 1 + get_random_number(MAX_ID,
|
|
|
+ &query_ctx->ctx->random_seed);
|
|
|
c = snprintf(iter,
|
|
|
sz,
|
|
|
UPDATE_QUERY_ELEM,
|
|
@@ -293,15 +318,16 @@ static void do_updates(multiple_query_ctx_t *query_ctx)
|
|
|
if ((size_t) c >= sz)
|
|
|
goto error;
|
|
|
|
|
|
- if (execute_query(ctx, &query_ctx->query_param->param))
|
|
|
+ if (execute_query(query_ctx->ctx, &query_ctx->query_param->param)) {
|
|
|
+ query_ctx->cleanup = true;
|
|
|
send_service_unavailable_error(DB_REQ_ERROR, query_ctx->req);
|
|
|
- else {
|
|
|
- query_ctx->num_query_in_progress++;
|
|
|
- h2o_mem_addref_shared(query_ctx);
|
|
|
}
|
|
|
+ else
|
|
|
+ query_ctx->num_query_in_progress++;
|
|
|
|
|
|
return;
|
|
|
error:
|
|
|
+ query_ctx->cleanup = true;
|
|
|
LIBRARY_ERROR("snprintf", "Truncated output.");
|
|
|
send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, query_ctx->req);
|
|
|
}
|
|
@@ -370,13 +396,16 @@ static void on_multiple_query_error(db_query_param_t *param, const char *error_s
|
|
|
const query_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(query_param_t, param, param);
|
|
|
multiple_query_ctx_t * const query_ctx = query_param->ctx;
|
|
|
|
|
|
- if (!query_ctx->error) {
|
|
|
- query_ctx->error = true;
|
|
|
+ query_ctx->num_query_in_progress--;
|
|
|
+
|
|
|
+ if (query_ctx->cleanup) {
|
|
|
+ if (!query_ctx->num_query_in_progress)
|
|
|
+ cleanup_multiple_query(query_ctx);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ query_ctx->cleanup = true;
|
|
|
send_error(BAD_GATEWAY, error_string, query_ctx->req);
|
|
|
}
|
|
|
-
|
|
|
- query_ctx->num_query_in_progress--;
|
|
|
- h2o_mem_release_shared(query_ctx);
|
|
|
}
|
|
|
|
|
|
static result_return_t on_multiple_query_result(db_query_param_t *param, PGresult *result)
|
|
@@ -384,11 +413,13 @@ static result_return_t on_multiple_query_result(db_query_param_t *param, PGresul
|
|
|
query_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(query_param_t, param, param);
|
|
|
multiple_query_ctx_t * const query_ctx = query_param->ctx;
|
|
|
|
|
|
- if (!query_ctx->error && PQresultStatus(result) == PGRES_TUPLES_OK) {
|
|
|
- thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
- event_loop.h2o_ctx,
|
|
|
- query_ctx->req->conn->ctx);
|
|
|
+ query_ctx->num_query_in_progress--;
|
|
|
|
|
|
+ if (query_ctx->cleanup) {
|
|
|
+ if (!query_ctx->num_query_in_progress)
|
|
|
+ cleanup_multiple_query(query_ctx);
|
|
|
+ }
|
|
|
+ else if (PQresultStatus(result) == PGRES_TUPLES_OK) {
|
|
|
process_result(result, query_ctx->res + query_ctx->num_result);
|
|
|
|
|
|
if (query_ctx->use_cache) {
|
|
@@ -399,14 +430,13 @@ static result_return_t on_multiple_query_result(db_query_param_t *param, PGresul
|
|
|
const h2o_iovec_t value = {.base = (char *) r, .len = sizeof(*r)};
|
|
|
|
|
|
*r = query_ctx->res[query_ctx->num_result];
|
|
|
- cache_set(h2o_now(ctx->event_loop.h2o_ctx.loop),
|
|
|
+ cache_set(h2o_now(query_ctx->ctx->event_loop.h2o_ctx.loop),
|
|
|
key,
|
|
|
value,
|
|
|
- &ctx->global_data->world_cache);
|
|
|
+ &query_ctx->ctx->global_data->world_cache);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- query_ctx->num_query_in_progress--;
|
|
|
query_ctx->num_result++;
|
|
|
|
|
|
const size_t num_query_remaining = query_ctx->num_query - query_ctx->num_result;
|
|
@@ -416,25 +446,20 @@ static result_return_t on_multiple_query_result(db_query_param_t *param, PGresul
|
|
|
|
|
|
query_param->id = htonl(query_ctx->res[idx].id);
|
|
|
|
|
|
- if (!execute_query(ctx, &query_param->param)) {
|
|
|
- query_ctx->num_query_in_progress++;
|
|
|
- PQclear(result);
|
|
|
- return DONE;
|
|
|
+ if (execute_query(query_ctx->ctx, &query_param->param)) {
|
|
|
+ query_ctx->cleanup = true;
|
|
|
+ send_service_unavailable_error(DB_REQ_ERROR, query_ctx->req);
|
|
|
}
|
|
|
-
|
|
|
- query_ctx->error = true;
|
|
|
- send_service_unavailable_error(DB_REQ_ERROR, query_ctx->req);
|
|
|
+ else
|
|
|
+ query_ctx->num_query_in_progress++;
|
|
|
}
|
|
|
else if (query_ctx->num_result == query_ctx->num_query)
|
|
|
complete_multiple_query(query_ctx);
|
|
|
-
|
|
|
- h2o_mem_release_shared(query_ctx);
|
|
|
}
|
|
|
else {
|
|
|
- if (!query_ctx->error)
|
|
|
- LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
|
|
|
-
|
|
|
- on_multiple_query_error(param, DB_ERROR);
|
|
|
+ query_ctx->cleanup = true;
|
|
|
+ LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
|
|
|
+ send_error(BAD_GATEWAY, DB_ERROR, query_ctx->req);
|
|
|
}
|
|
|
|
|
|
PQclear(result);
|
|
@@ -446,27 +471,40 @@ static void on_multiple_query_timeout(db_query_param_t *param)
|
|
|
const query_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(query_param_t, param, param);
|
|
|
multiple_query_ctx_t * const query_ctx = query_param->ctx;
|
|
|
|
|
|
- if (!query_ctx->error) {
|
|
|
- query_ctx->error = true;
|
|
|
+ query_ctx->num_query_in_progress--;
|
|
|
+
|
|
|
+ if (query_ctx->cleanup) {
|
|
|
+ if (!query_ctx->num_query_in_progress)
|
|
|
+ cleanup_multiple_query(query_ctx);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ query_ctx->cleanup = true;
|
|
|
send_error(GATEWAY_TIMEOUT, DB_TIMEOUT_ERROR, query_ctx->req);
|
|
|
}
|
|
|
-
|
|
|
- query_ctx->num_query_in_progress--;
|
|
|
- h2o_mem_release_shared(query_ctx);
|
|
|
}
|
|
|
|
|
|
static void on_single_query_error(db_query_param_t *param, const char *error_string)
|
|
|
{
|
|
|
single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t, param, param);
|
|
|
|
|
|
- send_error(BAD_GATEWAY, error_string, query_ctx->req);
|
|
|
+ if (query_ctx->cleanup)
|
|
|
+ cleanup_single_query(query_ctx);
|
|
|
+ else {
|
|
|
+ query_ctx->cleanup = true;
|
|
|
+ send_error(BAD_GATEWAY, error_string, query_ctx->req);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static result_return_t on_single_query_result(db_query_param_t *param, PGresult *result)
|
|
|
{
|
|
|
single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t, param, param);
|
|
|
+ const bool cleanup = query_ctx->cleanup;
|
|
|
|
|
|
- if (PQresultStatus(result) == PGRES_TUPLES_OK) {
|
|
|
+ query_ctx->cleanup = true;
|
|
|
+
|
|
|
+ if (cleanup)
|
|
|
+ cleanup_single_query(query_ctx);
|
|
|
+ else if (PQresultStatus(result) == PGRES_TUPLES_OK) {
|
|
|
uint32_t random_number;
|
|
|
|
|
|
assert(PQnfields(result) == 2);
|
|
@@ -479,7 +517,6 @@ static result_return_t on_single_query_result(db_query_param_t *param, PGresult
|
|
|
// Use memcpy() in case the result is not aligned.
|
|
|
memcpy(&random_number, r, sizeof(random_number));
|
|
|
random_number = ntohl(random_number);
|
|
|
- PQclear(result);
|
|
|
|
|
|
thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
event_loop.h2o_ctx,
|
|
@@ -490,22 +527,22 @@ static result_return_t on_single_query_result(db_query_param_t *param, PGresult
|
|
|
if (gen) {
|
|
|
// The response is small enough, so that it is simpler to copy it
|
|
|
// instead of doing a delayed deallocation of the JSON generator.
|
|
|
- if (!serialize_item(ntohl(query_ctx->id), random_number, gen->gen) &&
|
|
|
- !send_json_response(gen, true, query_ctx->req))
|
|
|
- return DONE;
|
|
|
-
|
|
|
- // If there is a problem with the generator, don't reuse it.
|
|
|
- free_json_generator(gen, NULL, NULL, 0);
|
|
|
+ if (serialize_item(ntohl(query_ctx->id), random_number, gen->gen) ||
|
|
|
+ send_json_response(gen, true, query_ctx->req)) {
|
|
|
+ // If there is a problem with the generator, don't reuse it.
|
|
|
+ free_json_generator(gen, NULL, NULL, 0);
|
|
|
+ send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, query_ctx->req);
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, query_ctx->req);
|
|
|
+ else
|
|
|
+ send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, query_ctx->req);
|
|
|
}
|
|
|
else {
|
|
|
LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
|
|
|
send_error(BAD_GATEWAY, DB_ERROR, query_ctx->req);
|
|
|
- PQclear(result);
|
|
|
}
|
|
|
|
|
|
+ PQclear(result);
|
|
|
return DONE;
|
|
|
}
|
|
|
|
|
@@ -513,32 +550,37 @@ static void on_single_query_timeout(db_query_param_t *param)
|
|
|
{
|
|
|
single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t, param, param);
|
|
|
|
|
|
- send_error(GATEWAY_TIMEOUT, DB_TIMEOUT_ERROR, query_ctx->req);
|
|
|
+ if (query_ctx->cleanup)
|
|
|
+ cleanup_single_query(query_ctx);
|
|
|
+ else {
|
|
|
+ query_ctx->cleanup = true;
|
|
|
+ send_error(GATEWAY_TIMEOUT, DB_TIMEOUT_ERROR, query_ctx->req);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static result_return_t on_update_result(db_query_param_t *param, PGresult *result)
|
|
|
{
|
|
|
query_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(query_param_t, param, param);
|
|
|
multiple_query_ctx_t * const query_ctx = query_param->ctx;
|
|
|
+ const bool cleanup = query_ctx->cleanup;
|
|
|
|
|
|
- if (PQresultStatus(result) == PGRES_COMMAND_OK) {
|
|
|
- thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
- event_loop.h2o_ctx,
|
|
|
- query_ctx->req->conn->ctx);
|
|
|
+ query_ctx->cleanup = true;
|
|
|
+ query_ctx->num_query_in_progress--;
|
|
|
|
|
|
- query_ctx->num_query_in_progress--;
|
|
|
- query_ctx->gen = get_json_generator(&ctx->json_generator, &ctx->json_generator_num);
|
|
|
+ if (cleanup)
|
|
|
+ cleanup_multiple_query(query_ctx);
|
|
|
+ else if (PQresultStatus(result) == PGRES_COMMAND_OK) {
|
|
|
+ query_ctx->gen = get_json_generator(&query_ctx->ctx->json_generator,
|
|
|
+ &query_ctx->ctx->json_generator_num);
|
|
|
|
|
|
if (query_ctx->gen)
|
|
|
serialize_items(query_ctx->res, query_ctx->num_result, &query_ctx->gen, query_ctx->req);
|
|
|
else
|
|
|
send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, query_ctx->req);
|
|
|
-
|
|
|
- h2o_mem_release_shared(query_ctx);
|
|
|
}
|
|
|
else {
|
|
|
LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
|
|
|
- on_multiple_query_error(param, DB_ERROR);
|
|
|
+ send_error(BAD_GATEWAY, DB_ERROR, query_ctx->req);
|
|
|
}
|
|
|
|
|
|
PQclear(result);
|
|
@@ -636,10 +678,14 @@ int single_query(struct st_h2o_handler_t *self, h2o_req_t *req)
|
|
|
thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
event_loop.h2o_ctx,
|
|
|
req->conn->ctx);
|
|
|
- single_query_ctx_t * const query_ctx = h2o_mem_alloc_pool(&req->pool, sizeof(*query_ctx));
|
|
|
+ single_query_ctx_t * const query_ctx = calloc(1, sizeof(*query_ctx));
|
|
|
|
|
|
if (query_ctx) {
|
|
|
- memset(query_ctx, 0, sizeof(*query_ctx));
|
|
|
+ single_query_ctx_t ** const p = h2o_mem_alloc_shared(&req->pool,
|
|
|
+ sizeof(*p),
|
|
|
+ cleanup_single_query_request);
|
|
|
+
|
|
|
+ *p = query_ctx;
|
|
|
query_ctx->id = htonl(get_random_number(MAX_ID, &ctx->random_seed) + 1);
|
|
|
query_ctx->id_format = 1;
|
|
|
query_ctx->id_len = sizeof(query_ctx->id);
|
|
@@ -656,8 +702,10 @@ int single_query(struct st_h2o_handler_t *self, h2o_req_t *req)
|
|
|
query_ctx->param.resultFormat = 1;
|
|
|
query_ctx->req = req;
|
|
|
|
|
|
- if (execute_query(ctx, &query_ctx->param))
|
|
|
+ if (execute_query(ctx, &query_ctx->param)) {
|
|
|
+ query_ctx->cleanup = true;
|
|
|
send_service_unavailable_error(DB_REQ_ERROR, req);
|
|
|
+ }
|
|
|
}
|
|
|
else
|
|
|
send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, req);
|