|
@@ -23,6 +23,7 @@
|
|
|
#include <stddef.h>
|
|
|
#include <stdint.h>
|
|
|
#include <stdio.h>
|
|
|
+#include <stdlib.h>
|
|
|
#include <arpa/inet.h>
|
|
|
#include <postgresql/libpq-fe.h>
|
|
|
#include <yajl/yajl_gen.h>
|
|
@@ -73,29 +74,14 @@ struct multiple_query_ctx_t {
|
|
|
size_t num_query;
|
|
|
size_t num_query_in_progress;
|
|
|
size_t num_result;
|
|
|
- query_result_t res[];
|
|
|
-};
|
|
|
-
|
|
|
-typedef struct {
|
|
|
- char *command;
|
|
|
- update_ctx_t *ctx;
|
|
|
- uint32_t random_number;
|
|
|
- bool update;
|
|
|
- db_query_param_t param;
|
|
|
-} update_param_t;
|
|
|
-
|
|
|
-struct update_ctx_t {
|
|
|
- yajl_gen gen; // also an error flag
|
|
|
- h2o_req_t *req;
|
|
|
- update_param_t *update_param;
|
|
|
- size_t num_query;
|
|
|
- size_t num_query_in_progress;
|
|
|
- size_t num_result;
|
|
|
+ bool do_update;
|
|
|
query_result_t res[];
|
|
|
};
|
|
|
|
|
|
static void cleanup_multiple_query(void *data);
|
|
|
-static void cleanup_update(void *data);
|
|
|
+static int compare_items(const void *x, const void *y);
|
|
|
+static int do_multiple_queries(bool do_update, h2o_req_t *req);
|
|
|
+static void do_updates(multiple_query_ctx_t *query_ctx);
|
|
|
static size_t get_query_number(h2o_req_t *req);
|
|
|
static void initialize_ids(size_t num_query, query_result_t *res, unsigned int *seed);
|
|
|
static void on_multiple_query_error(db_query_param_t *param, const char *error_string);
|
|
@@ -104,9 +90,7 @@ static void on_multiple_query_timeout(db_query_param_t *param);
|
|
|
static void on_single_query_error(db_query_param_t *param, const char *error_string);
|
|
|
static result_return_t on_single_query_result(db_query_param_t *param, PGresult *result);
|
|
|
static void on_single_query_timeout(db_query_param_t *param);
|
|
|
-static void on_update_error(db_query_param_t *param, const char *error_string);
|
|
|
static result_return_t on_update_result(db_query_param_t *param, PGresult *result);
|
|
|
-static void on_update_timeout(db_query_param_t *param);
|
|
|
static void process_result(PGresult *result, query_result_t *out);
|
|
|
static int serialize_item(uint32_t id, uint32_t random_number, yajl_gen gen);
|
|
|
static void serialize_items(const query_result_t *res,
|
|
@@ -122,12 +106,156 @@ static void cleanup_multiple_query(void *data)
|
|
|
yajl_gen_free(query_ctx->gen);
|
|
|
}
|
|
|
|
|
|
-static void cleanup_update(void *data)
|
|
|
+static int compare_items(const void *x, const void *y)
|
|
|
+{
|
|
|
+ const query_result_t * const r1 = x;
|
|
|
+ const query_result_t * const r2 = y;
|
|
|
+
|
|
|
+ return r1->id < r2->id ? -1 : r1->id > r2->id;
|
|
|
+}
|
|
|
+
|
|
|
+static int do_multiple_queries(bool do_update, h2o_req_t *req)
|
|
|
+{
|
|
|
+ thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
+ event_loop.h2o_ctx,
|
|
|
+ req->conn->ctx);
|
|
|
+
|
|
|
+ const size_t num_query = get_query_number(req);
|
|
|
+
|
|
|
+ // MAX_QUERIES is a relatively small number, so assume no overflow in the following
|
|
|
+ // arithmetic operations.
|
|
|
+ assert(num_query <= MAX_QUERIES);
|
|
|
+
|
|
|
+ size_t base_size = offsetof(multiple_query_ctx_t, res) + num_query * sizeof(query_result_t);
|
|
|
+
|
|
|
+ base_size = ((base_size + _Alignof(query_param_t) - 1) / _Alignof(query_param_t));
|
|
|
+ base_size = base_size * _Alignof(query_param_t);
|
|
|
+
|
|
|
+ const size_t num_query_in_progress = MIN(num_query, ctx->config->max_db_conn_num);
|
|
|
+ size_t sz = base_size + num_query_in_progress * sizeof(query_param_t);
|
|
|
+
|
|
|
+ if (do_update) {
|
|
|
+ const size_t reuse_size = (num_query_in_progress - 1) * sizeof(query_param_t);
|
|
|
+ const size_t update_query_len = MAX_UPDATE_QUERY_LEN(num_query);
|
|
|
+
|
|
|
+ if (update_query_len > reuse_size)
|
|
|
+ sz += update_query_len - reuse_size;
|
|
|
+ }
|
|
|
+
|
|
|
+ multiple_query_ctx_t * const query_ctx = h2o_mem_alloc_shared(&req->pool,
|
|
|
+ sz,
|
|
|
+ cleanup_multiple_query);
|
|
|
+
|
|
|
+ if (query_ctx) {
|
|
|
+ memset(query_ctx, 0, sz);
|
|
|
+ query_ctx->num_query = num_query;
|
|
|
+ query_ctx->num_query_in_progress = num_query_in_progress;
|
|
|
+ query_ctx->req = req;
|
|
|
+ query_ctx->do_update = do_update;
|
|
|
+ query_ctx->query_param = (query_param_t *) ((char *) query_ctx + base_size);
|
|
|
+ initialize_ids(num_query, query_ctx->res, &ctx->random_seed);
|
|
|
+
|
|
|
+ for (size_t i = 0; i < query_ctx->num_query_in_progress; i++) {
|
|
|
+ query_ctx->query_param[i].ctx = query_ctx;
|
|
|
+ // We need a copy of id because the original may be overwritten
|
|
|
+ // by a completed query.
|
|
|
+ query_ctx->query_param[i].id = htonl(query_ctx->res[i].id);
|
|
|
+ query_ctx->query_param[i].id_format = 1;
|
|
|
+ query_ctx->query_param[i].id_len = sizeof(query_ctx->query_param[i].id);
|
|
|
+ query_ctx->query_param[i].id_pointer = (const char *) &query_ctx->query_param[i].id;
|
|
|
+ query_ctx->query_param[i].param.command = WORLD_TABLE_NAME;
|
|
|
+ query_ctx->query_param[i].param.nParams = 1;
|
|
|
+ query_ctx->query_param[i].param.on_error = on_multiple_query_error;
|
|
|
+ query_ctx->query_param[i].param.on_result = on_multiple_query_result;
|
|
|
+ query_ctx->query_param[i].param.on_timeout = on_multiple_query_timeout;
|
|
|
+ query_ctx->query_param[i].param.paramFormats = &query_ctx->query_param[i].id_format;
|
|
|
+ query_ctx->query_param[i].param.paramLengths = &query_ctx->query_param[i].id_len;
|
|
|
+ query_ctx->query_param[i].param.paramValues = &query_ctx->query_param[i].id_pointer;
|
|
|
+ query_ctx->query_param[i].param.flags = IS_PREPARED;
|
|
|
+ query_ctx->query_param[i].param.resultFormat = 1;
|
|
|
+
|
|
|
+ if (execute_query(ctx, &query_ctx->query_param[i].param)) {
|
|
|
+ query_ctx->num_query_in_progress = i;
|
|
|
+ send_service_unavailable_error(DB_REQ_ERROR, req);
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ h2o_mem_addref_shared(query_ctx);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create a JSON generator while the queries are processed.
|
|
|
+ query_ctx->gen = get_json_generator(&req->pool);
|
|
|
+
|
|
|
+ if (!query_ctx->gen)
|
|
|
+ send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, req);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, req);
|
|
|
+
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
+static void do_updates(multiple_query_ctx_t *query_ctx)
|
|
|
{
|
|
|
- const update_ctx_t * const update_ctx = data;
|
|
|
+ thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
+ event_loop.h2o_ctx,
|
|
|
+ query_ctx->req->conn->ctx);
|
|
|
+ char *iter = (char *) (query_ctx->query_param + 1);
|
|
|
+ size_t sz = MAX_UPDATE_QUERY_LEN(query_ctx->num_result);
|
|
|
+
|
|
|
+ // Sort the results to avoid database deadlock.
|
|
|
+ qsort(query_ctx->res, query_ctx->num_result, sizeof(*query_ctx->res), compare_items);
|
|
|
+ query_ctx->query_param->param.command = iter;
|
|
|
+ query_ctx->query_param->param.nParams = 0;
|
|
|
+ query_ctx->query_param->param.on_result = on_update_result;
|
|
|
+ query_ctx->query_param->param.paramFormats = NULL;
|
|
|
+ query_ctx->query_param->param.paramLengths = NULL;
|
|
|
+ query_ctx->query_param->param.paramValues = NULL;
|
|
|
+ query_ctx->query_param->param.flags = 0;
|
|
|
+ query_ctx->res->random_number = get_random_number(MAX_ID, &ctx->random_seed) + 1;
|
|
|
+
|
|
|
+ int c = snprintf(iter,
|
|
|
+ sz,
|
|
|
+ UPDATE_QUERY_BEGIN,
|
|
|
+ query_ctx->res->id,
|
|
|
+ query_ctx->res->random_number);
|
|
|
+
|
|
|
+ if (c < 0 || (size_t) c >= sz)
|
|
|
+ goto error;
|
|
|
+
|
|
|
+ iter += c;
|
|
|
+ sz -= c;
|
|
|
+
|
|
|
+ for (size_t i = 1; i < query_ctx->num_result; i++) {
|
|
|
+ query_ctx->res[i].random_number = get_random_number(MAX_ID, &ctx->random_seed) + 1;
|
|
|
+ c = snprintf(iter,
|
|
|
+ sz,
|
|
|
+ UPDATE_QUERY_ELEM,
|
|
|
+ query_ctx->res[i].id,
|
|
|
+ query_ctx->res[i].random_number);
|
|
|
+
|
|
|
+ if (c < 0 || (size_t) c >= sz)
|
|
|
+ goto error;
|
|
|
+
|
|
|
+ iter += c;
|
|
|
+ sz -= c;
|
|
|
+ }
|
|
|
+
|
|
|
+ c = snprintf(iter, sz, UPDATE_QUERY_END);
|
|
|
+
|
|
|
+ if (c < 0 || (size_t) c >= sz)
|
|
|
+ goto error;
|
|
|
+
|
|
|
+ if (execute_query(ctx, &query_ctx->query_param->param))
|
|
|
+ send_service_unavailable_error(DB_REQ_ERROR, query_ctx->req);
|
|
|
+ else {
|
|
|
+ query_ctx->num_query_in_progress++;
|
|
|
+ h2o_mem_addref_shared(query_ctx);
|
|
|
+ }
|
|
|
|
|
|
- if (update_ctx->gen)
|
|
|
- yajl_gen_free(update_ctx->gen);
|
|
|
+ return;
|
|
|
+error:
|
|
|
+ send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, query_ctx->req);
|
|
|
}
|
|
|
|
|
|
static size_t get_query_number(h2o_req_t *req)
|
|
@@ -216,8 +344,12 @@ static result_return_t on_multiple_query_result(db_query_param_t *param, PGresul
|
|
|
query_ctx->gen = NULL;
|
|
|
send_service_unavailable_error(DB_REQ_ERROR, query_ctx->req);
|
|
|
}
|
|
|
- else if (query_ctx->num_result == query_ctx->num_query)
|
|
|
- serialize_items(query_ctx->res, query_ctx->num_result, query_ctx->gen, query_ctx->req);
|
|
|
+ else if (query_ctx->num_result == query_ctx->num_query) {
|
|
|
+ if (query_ctx->do_update)
|
|
|
+ do_updates(query_ctx);
|
|
|
+ else
|
|
|
+ serialize_items(query_ctx->res, query_ctx->num_result, query_ctx->gen, query_ctx->req);
|
|
|
+ }
|
|
|
|
|
|
h2o_mem_release_shared(query_ctx);
|
|
|
}
|
|
@@ -300,107 +432,23 @@ static void on_single_query_timeout(db_query_param_t *param)
|
|
|
send_error(GATEWAY_TIMEOUT, DB_TIMEOUT_ERROR, query_ctx->req);
|
|
|
}
|
|
|
|
|
|
-static void on_update_error(db_query_param_t *param, const char *error_string)
|
|
|
-{
|
|
|
- const update_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(update_param_t, param, param);
|
|
|
- update_ctx_t * const update_ctx = query_param->ctx;
|
|
|
-
|
|
|
- if (update_ctx->gen) {
|
|
|
- yajl_gen_free(update_ctx->gen);
|
|
|
- update_ctx->gen = NULL;
|
|
|
- send_error(BAD_GATEWAY, error_string, update_ctx->req);
|
|
|
- }
|
|
|
-
|
|
|
- update_ctx->num_query_in_progress--;
|
|
|
- h2o_mem_release_shared(update_ctx);
|
|
|
-}
|
|
|
-
|
|
|
static result_return_t on_update_result(db_query_param_t *param, PGresult *result)
|
|
|
{
|
|
|
- update_param_t * const update_param = H2O_STRUCT_FROM_MEMBER(update_param_t, param, param);
|
|
|
- update_ctx_t * const update_ctx = update_param->ctx;
|
|
|
- result_return_t ret = DONE;
|
|
|
-
|
|
|
- if (update_ctx->gen) {
|
|
|
- if (update_param->update) {
|
|
|
- if (PQresultStatus(result) == PGRES_COMMAND_OK) {
|
|
|
- thread_context_t * const ctx =
|
|
|
- H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
- event_loop.h2o_ctx,
|
|
|
- update_ctx->req->conn->ctx);
|
|
|
- const size_t num_query_remaining = update_ctx->num_query - update_ctx->num_result;
|
|
|
-
|
|
|
- update_ctx->num_query_in_progress--;
|
|
|
-
|
|
|
- if (update_ctx->num_query_in_progress < num_query_remaining) {
|
|
|
- const size_t idx = update_ctx->num_result + update_ctx->num_query_in_progress;
|
|
|
-
|
|
|
- update_param->random_number = get_random_number(MAX_ID, &ctx->random_seed) + 1;
|
|
|
- update_param->update = false;
|
|
|
- snprintf(update_param->command,
|
|
|
- MAX_UPDATE_QUERY_LEN,
|
|
|
- UPDATE_QUERY,
|
|
|
- update_ctx->res[idx].id,
|
|
|
- update_ctx->res[idx].id,
|
|
|
- update_param->random_number);
|
|
|
-
|
|
|
- if (!execute_query(ctx, &update_param->param)) {
|
|
|
- update_ctx->num_query_in_progress++;
|
|
|
- PQclear(result);
|
|
|
- return DONE;
|
|
|
- }
|
|
|
-
|
|
|
- yajl_gen_free(update_ctx->gen);
|
|
|
- update_ctx->gen = NULL;
|
|
|
- send_service_unavailable_error(DB_REQ_ERROR, update_ctx->req);
|
|
|
- }
|
|
|
- else if (update_ctx->num_result == update_ctx->num_query)
|
|
|
- serialize_items(update_ctx->res,
|
|
|
- update_ctx->num_result,
|
|
|
- update_ctx->gen,
|
|
|
- update_ctx->req);
|
|
|
-
|
|
|
- h2o_mem_release_shared(update_ctx);
|
|
|
- }
|
|
|
- else {
|
|
|
- LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
|
|
|
- on_update_error(param, DB_ERROR);
|
|
|
- }
|
|
|
- }
|
|
|
- else if (PQresultStatus(result) == PGRES_TUPLES_OK) {
|
|
|
- process_result(result, update_ctx->res + update_ctx->num_result);
|
|
|
- update_ctx->res[update_ctx->num_result].random_number = update_param->random_number;
|
|
|
- update_ctx->num_result++;
|
|
|
- update_param->update = true;
|
|
|
- ret = SUCCESS;
|
|
|
- }
|
|
|
- else {
|
|
|
- LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
|
|
|
- on_update_error(param, DB_ERROR);
|
|
|
- }
|
|
|
+ query_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(query_param_t, param, param);
|
|
|
+ multiple_query_ctx_t * const query_ctx = query_param->ctx;
|
|
|
+
|
|
|
+ if (PQresultStatus(result) == PGRES_COMMAND_OK) {
|
|
|
+ query_ctx->num_query_in_progress--;
|
|
|
+ serialize_items(query_ctx->res, query_ctx->num_result, query_ctx->gen, query_ctx->req);
|
|
|
+ h2o_mem_release_shared(query_ctx);
|
|
|
}
|
|
|
else {
|
|
|
- update_ctx->num_query_in_progress--;
|
|
|
- h2o_mem_release_shared(update_ctx);
|
|
|
+ LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
|
|
|
+ on_multiple_query_error(param, DB_ERROR);
|
|
|
}
|
|
|
|
|
|
PQclear(result);
|
|
|
- return ret;
|
|
|
-}
|
|
|
-
|
|
|
-static void on_update_timeout(db_query_param_t *param)
|
|
|
-{
|
|
|
- const update_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(update_param_t, param, param);
|
|
|
- update_ctx_t * const update_ctx = query_param->ctx;
|
|
|
-
|
|
|
- if (update_ctx->gen) {
|
|
|
- yajl_gen_free(update_ctx->gen);
|
|
|
- update_ctx->gen = NULL;
|
|
|
- send_error(GATEWAY_TIMEOUT, DB_TIMEOUT_ERROR, update_ctx->req);
|
|
|
- }
|
|
|
-
|
|
|
- update_ctx->num_query_in_progress--;
|
|
|
- h2o_mem_release_shared(update_ctx);
|
|
|
+ return DONE;
|
|
|
}
|
|
|
|
|
|
static void process_result(PGresult *result, query_result_t *out)
|
|
@@ -467,75 +515,7 @@ error_yajl:
|
|
|
int multiple_queries(struct st_h2o_handler_t *self, h2o_req_t *req)
|
|
|
{
|
|
|
IGNORE_FUNCTION_PARAMETER(self);
|
|
|
-
|
|
|
- thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
- event_loop.h2o_ctx,
|
|
|
- req->conn->ctx);
|
|
|
-
|
|
|
- const size_t num_query = get_query_number(req);
|
|
|
-
|
|
|
- // MAX_QUERIES is a relatively small number, so assume no overflow in the following
|
|
|
- // arithmetic operations.
|
|
|
- assert(num_query <= MAX_QUERIES);
|
|
|
-
|
|
|
- size_t base_size = offsetof(multiple_query_ctx_t, res) + num_query * sizeof(query_result_t);
|
|
|
-
|
|
|
- base_size = ((base_size + _Alignof(query_param_t) - 1) / _Alignof(query_param_t));
|
|
|
- base_size = base_size * _Alignof(query_param_t);
|
|
|
-
|
|
|
- const size_t num_query_in_progress = MIN(num_query, ctx->config->max_db_conn_num);
|
|
|
- const size_t sz = base_size + num_query_in_progress * sizeof(query_param_t);
|
|
|
-
|
|
|
- multiple_query_ctx_t * const query_ctx = h2o_mem_alloc_shared(&req->pool,
|
|
|
- sz,
|
|
|
- cleanup_multiple_query);
|
|
|
-
|
|
|
- if (query_ctx) {
|
|
|
- memset(query_ctx, 0, sz);
|
|
|
- query_ctx->num_query = num_query;
|
|
|
- query_ctx->num_query_in_progress = num_query_in_progress;
|
|
|
- query_ctx->req = req;
|
|
|
- query_ctx->query_param = (query_param_t *) ((char *) query_ctx + base_size);
|
|
|
- initialize_ids(num_query, query_ctx->res, &ctx->random_seed);
|
|
|
-
|
|
|
- for (size_t i = 0; i < query_ctx->num_query_in_progress; i++) {
|
|
|
- query_ctx->query_param[i].ctx = query_ctx;
|
|
|
- // We need a copy of id because the original may be overwritten
|
|
|
- // by a completed query.
|
|
|
- query_ctx->query_param[i].id = htonl(query_ctx->res[i].id);
|
|
|
- query_ctx->query_param[i].id_format = 1;
|
|
|
- query_ctx->query_param[i].id_len = sizeof(query_ctx->query_param[i].id);
|
|
|
- query_ctx->query_param[i].id_pointer = (const char *) &query_ctx->query_param[i].id;
|
|
|
- query_ctx->query_param[i].param.command = WORLD_TABLE_NAME;
|
|
|
- query_ctx->query_param[i].param.nParams = 1;
|
|
|
- query_ctx->query_param[i].param.on_error = on_multiple_query_error;
|
|
|
- query_ctx->query_param[i].param.on_result = on_multiple_query_result;
|
|
|
- query_ctx->query_param[i].param.on_timeout = on_multiple_query_timeout;
|
|
|
- query_ctx->query_param[i].param.paramFormats = &query_ctx->query_param[i].id_format;
|
|
|
- query_ctx->query_param[i].param.paramLengths = &query_ctx->query_param[i].id_len;
|
|
|
- query_ctx->query_param[i].param.paramValues = &query_ctx->query_param[i].id_pointer;
|
|
|
- query_ctx->query_param[i].param.flags = IS_PREPARED;
|
|
|
- query_ctx->query_param[i].param.resultFormat = 1;
|
|
|
-
|
|
|
- if (execute_query(ctx, &query_ctx->query_param[i].param)) {
|
|
|
- query_ctx->num_query_in_progress = i;
|
|
|
- send_service_unavailable_error(DB_REQ_ERROR, req);
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- h2o_mem_addref_shared(query_ctx);
|
|
|
- }
|
|
|
-
|
|
|
- // Create a JSON generator while the queries are processed.
|
|
|
- query_ctx->gen = get_json_generator(&req->pool);
|
|
|
-
|
|
|
- if (!query_ctx->gen)
|
|
|
- send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, req);
|
|
|
- }
|
|
|
- else
|
|
|
- send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, req);
|
|
|
-
|
|
|
- return 0;
|
|
|
+ return do_multiple_queries(false, req);
|
|
|
}
|
|
|
|
|
|
int single_query(struct st_h2o_handler_t *self, h2o_req_t *req)
|
|
@@ -577,72 +557,5 @@ int single_query(struct st_h2o_handler_t *self, h2o_req_t *req)
|
|
|
int updates(struct st_h2o_handler_t *self, h2o_req_t *req)
|
|
|
{
|
|
|
IGNORE_FUNCTION_PARAMETER(self);
|
|
|
-
|
|
|
- thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
- event_loop.h2o_ctx,
|
|
|
- req->conn->ctx);
|
|
|
-
|
|
|
- const size_t num_query = get_query_number(req);
|
|
|
-
|
|
|
- // MAX_QUERIES is a relatively small number, so assume no overflow in the following
|
|
|
- // arithmetic operations.
|
|
|
- assert(num_query <= MAX_QUERIES);
|
|
|
-
|
|
|
- size_t base_size = offsetof(update_ctx_t, res) + num_query * sizeof(query_result_t);
|
|
|
-
|
|
|
- base_size = ((base_size + _Alignof(update_param_t) - 1) / _Alignof(update_param_t));
|
|
|
- base_size = base_size * _Alignof(update_param_t);
|
|
|
-
|
|
|
- const size_t num_query_in_progress = MIN(num_query, ctx->config->max_db_conn_num);
|
|
|
- const size_t struct_size = base_size + num_query_in_progress * sizeof(update_param_t);
|
|
|
- const size_t sz = struct_size + num_query_in_progress * MAX_UPDATE_QUERY_LEN;
|
|
|
- update_ctx_t * const update_ctx = h2o_mem_alloc_shared(&req->pool, sz, cleanup_update);
|
|
|
-
|
|
|
- if (update_ctx) {
|
|
|
- memset(update_ctx, 0, struct_size);
|
|
|
- update_ctx->num_query = num_query;
|
|
|
- update_ctx->num_query_in_progress = num_query_in_progress;
|
|
|
- update_ctx->req = req;
|
|
|
- update_ctx->update_param = (update_param_t *) ((char *) update_ctx + base_size);
|
|
|
- initialize_ids(num_query, update_ctx->res, &ctx->random_seed);
|
|
|
-
|
|
|
- char *command = (char *) update_ctx + struct_size;
|
|
|
-
|
|
|
- for (size_t i = 0; i < update_ctx->num_query_in_progress; i++) {
|
|
|
- update_ctx->update_param[i].command = command;
|
|
|
- command += MAX_UPDATE_QUERY_LEN;
|
|
|
- update_ctx->update_param[i].ctx = update_ctx;
|
|
|
- update_ctx->update_param[i].param.command = update_ctx->update_param[i].command;
|
|
|
- update_ctx->update_param[i].param.on_error = on_update_error;
|
|
|
- update_ctx->update_param[i].param.on_result = on_update_result;
|
|
|
- update_ctx->update_param[i].param.on_timeout = on_update_timeout;
|
|
|
- update_ctx->update_param[i].param.resultFormat = 1;
|
|
|
- update_ctx->update_param[i].random_number =
|
|
|
- get_random_number(MAX_ID, &ctx->random_seed) + 1;
|
|
|
- snprintf(update_ctx->update_param[i].command,
|
|
|
- MAX_UPDATE_QUERY_LEN,
|
|
|
- UPDATE_QUERY,
|
|
|
- update_ctx->res[i].id,
|
|
|
- update_ctx->res[i].id,
|
|
|
- update_ctx->update_param[i].random_number);
|
|
|
-
|
|
|
- if (execute_query(ctx, &update_ctx->update_param[i].param)) {
|
|
|
- update_ctx->num_query_in_progress = i;
|
|
|
- send_service_unavailable_error(DB_REQ_ERROR, req);
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- h2o_mem_addref_shared(update_ctx);
|
|
|
- }
|
|
|
-
|
|
|
- // Create a JSON generator while the queries are processed.
|
|
|
- update_ctx->gen = get_json_generator(&req->pool);
|
|
|
-
|
|
|
- if (!update_ctx->gen)
|
|
|
- send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, req);
|
|
|
- }
|
|
|
- else
|
|
|
- send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, req);
|
|
|
-
|
|
|
- return 0;
|
|
|
+ return do_multiple_queries(true, req);
|
|
|
}
|