|
@@ -22,6 +22,7 @@
|
|
#include <stdbool.h>
|
|
#include <stdbool.h>
|
|
#include <stddef.h>
|
|
#include <stddef.h>
|
|
#include <stdint.h>
|
|
#include <stdint.h>
|
|
|
|
+#include <stdio.h>
|
|
#include <arpa/inet.h>
|
|
#include <arpa/inet.h>
|
|
#include <postgresql/libpq-fe.h>
|
|
#include <postgresql/libpq-fe.h>
|
|
#include <yajl/yajl_gen.h>
|
|
#include <yajl/yajl_gen.h>
|
|
@@ -40,6 +41,7 @@
|
|
#define RANDOM_NUM_KEY "randomNumber"
|
|
#define RANDOM_NUM_KEY "randomNumber"
|
|
|
|
|
|
typedef struct multiple_query_ctx_t multiple_query_ctx_t;
|
|
typedef struct multiple_query_ctx_t multiple_query_ctx_t;
|
|
|
|
+typedef struct update_ctx_t update_ctx_t;
|
|
|
|
|
|
typedef struct {
|
|
typedef struct {
|
|
uint32_t id;
|
|
uint32_t id;
|
|
@@ -74,13 +76,38 @@ struct multiple_query_ctx_t {
|
|
query_result_t res[];
|
|
query_result_t res[];
|
|
};
|
|
};
|
|
|
|
|
|
|
|
+typedef struct {
|
|
|
|
+ char *command;
|
|
|
|
+ update_ctx_t *ctx;
|
|
|
|
+ uint32_t random_number;
|
|
|
|
+ bool update;
|
|
|
|
+ db_query_param_t param;
|
|
|
|
+} update_param_t;
|
|
|
|
+
|
|
|
|
+struct update_ctx_t {
|
|
|
|
+ yajl_gen gen; // also an error flag
|
|
|
|
+ h2o_req_t *req;
|
|
|
|
+ update_param_t *update_param;
|
|
|
|
+ size_t num_query;
|
|
|
|
+ size_t num_query_in_progress;
|
|
|
|
+ size_t num_result;
|
|
|
|
+ query_result_t res[];
|
|
|
|
+};
|
|
|
|
+
|
|
static void cleanup_multiple_query(void *data);
|
|
static void cleanup_multiple_query(void *data);
|
|
|
|
+static void cleanup_update(void *data);
|
|
|
|
+static size_t get_query_number(h2o_req_t *req);
|
|
|
|
+static void initialize_ids(size_t num_query, query_result_t *res, unsigned int *seed);
|
|
static void on_multiple_query_error(db_query_param_t *param, const char *error_string);
|
|
static void on_multiple_query_error(db_query_param_t *param, const char *error_string);
|
|
static result_return_t on_multiple_query_result(db_query_param_t *param, PGresult *result);
|
|
static result_return_t on_multiple_query_result(db_query_param_t *param, PGresult *result);
|
|
static void on_multiple_query_timeout(db_query_param_t *param);
|
|
static void on_multiple_query_timeout(db_query_param_t *param);
|
|
static void on_single_query_error(db_query_param_t *param, const char *error_string);
|
|
static void on_single_query_error(db_query_param_t *param, const char *error_string);
|
|
static result_return_t on_single_query_result(db_query_param_t *param, PGresult *result);
|
|
static result_return_t on_single_query_result(db_query_param_t *param, PGresult *result);
|
|
static void on_single_query_timeout(db_query_param_t *param);
|
|
static void on_single_query_timeout(db_query_param_t *param);
|
|
|
|
+static void on_update_error(db_query_param_t *param, const char *error_string);
|
|
|
|
+static result_return_t on_update_result(db_query_param_t *param, PGresult *result);
|
|
|
|
+static void on_update_timeout(db_query_param_t *param);
|
|
|
|
+static void process_result(PGresult *result, query_result_t *out);
|
|
static int serialize_item(uint32_t id, uint32_t random_number, yajl_gen gen);
|
|
static int serialize_item(uint32_t id, uint32_t random_number, yajl_gen gen);
|
|
static void serialize_items(const query_result_t *res,
|
|
static void serialize_items(const query_result_t *res,
|
|
size_t num_result,
|
|
size_t num_result,
|
|
@@ -95,6 +122,54 @@ static void cleanup_multiple_query(void *data)
|
|
yajl_gen_free(query_ctx->gen);
|
|
yajl_gen_free(query_ctx->gen);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+static void cleanup_update(void *data)
|
|
|
|
+{
|
|
|
|
+ const update_ctx_t * const update_ctx = data;
|
|
|
|
+
|
|
|
|
+ if (update_ctx->gen)
|
|
|
|
+ yajl_gen_free(update_ctx->gen);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+static size_t get_query_number(h2o_req_t *req)
|
|
|
|
+{
|
|
|
|
+ int num_query = 0;
|
|
|
|
+
|
|
|
|
+ if (req->query_at < SIZE_MAX) {
|
|
|
|
+ const char * const n = get_query_param(req->path.base + req->query_at + 1,
|
|
|
|
+ req->path.len - req->query_at - 1,
|
|
|
|
+ QUERIES_PARAMETER,
|
|
|
|
+ sizeof(QUERIES_PARAMETER) - 1);
|
|
|
|
+
|
|
|
|
+ if (n)
|
|
|
|
+ num_query = atoi(n);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (num_query < 1)
|
|
|
|
+ num_query = 1;
|
|
|
|
+ else if (num_query > MAX_QUERIES)
|
|
|
|
+ num_query = MAX_QUERIES;
|
|
|
|
+
|
|
|
|
+ return num_query;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+static void initialize_ids(size_t num_query, query_result_t *res, unsigned int *seed)
|
|
|
|
+{
|
|
|
|
+ // MAX_ID is a relatively small number, so allocate on the stack.
|
|
|
|
+ DEFINE_BITSET(bitset, MAX_ID);
|
|
|
|
+
|
|
|
|
+ size_t max_rand = MAX_ID - num_query + 1;
|
|
|
|
+
|
|
|
|
+ for (size_t i = 0; i < num_query; i++) {
|
|
|
|
+ res[i].id = get_random_number(max_rand, seed);
|
|
|
|
+
|
|
|
|
+ if (BITSET_ISSET(res[i].id, bitset))
|
|
|
|
+ res[i].id = max_rand - 1;
|
|
|
|
+
|
|
|
|
+ BITSET_SET(res[i].id++, bitset);
|
|
|
|
+ max_rand++;
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
static void on_multiple_query_error(db_query_param_t *param, const char *error_string)
|
|
static void on_multiple_query_error(db_query_param_t *param, const char *error_string)
|
|
{
|
|
{
|
|
const query_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(query_param_t,
|
|
const query_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(query_param_t,
|
|
@@ -123,23 +198,14 @@ static result_return_t on_multiple_query_result(db_query_param_t *param, PGresul
|
|
thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
event_loop.h2o_ctx,
|
|
event_loop.h2o_ctx,
|
|
query_ctx->req->conn->ctx);
|
|
query_ctx->req->conn->ctx);
|
|
- uint32_t * const id = &query_ctx->res[query_ctx->num_result].id;
|
|
|
|
- uint32_t * const random_number = &query_ctx->res[query_ctx->num_result].random_number;
|
|
|
|
|
|
|
|
- assert(PQnfields(result) == 2);
|
|
|
|
- assert(PQntuples(result) == 1);
|
|
|
|
- assert(PQgetlength(result, 0, 0) == sizeof(*id));
|
|
|
|
- assert(PQgetlength(result, 0, 1) == sizeof(*random_number));
|
|
|
|
- // Use memcpy() in case the result is not aligned; the reason we are
|
|
|
|
- // copying over the id is because the results may arrive in any order.
|
|
|
|
- memcpy(id, PQgetvalue(result, 0, 0), sizeof(*id));
|
|
|
|
- memcpy(random_number, PQgetvalue(result, 0, 1), sizeof(*random_number));
|
|
|
|
- *id = ntohl(*id);
|
|
|
|
- *random_number = ntohl(*random_number);
|
|
|
|
|
|
+ process_result(result, query_ctx->res + query_ctx->num_result);
|
|
query_ctx->num_query_in_progress--;
|
|
query_ctx->num_query_in_progress--;
|
|
query_ctx->num_result++;
|
|
query_ctx->num_result++;
|
|
|
|
|
|
- if (query_ctx->num_query_in_progress < query_ctx->num_query - query_ctx->num_result) {
|
|
|
|
|
|
+ const size_t num_query_remaining = query_ctx->num_query - query_ctx->num_result;
|
|
|
|
+
|
|
|
|
+ if (query_ctx->num_query_in_progress < num_query_remaining) {
|
|
const size_t idx = query_ctx->num_result + query_ctx->num_query_in_progress;
|
|
const size_t idx = query_ctx->num_result + query_ctx->num_query_in_progress;
|
|
|
|
|
|
query_param->id = htonl(query_ctx->res[idx].id);
|
|
query_param->id = htonl(query_ctx->res[idx].id);
|
|
@@ -244,6 +310,144 @@ static void on_single_query_timeout(db_query_param_t *param)
|
|
send_error(GATEWAY_TIMEOUT, DB_TIMEOUT_ERROR, query_ctx->req);
|
|
send_error(GATEWAY_TIMEOUT, DB_TIMEOUT_ERROR, query_ctx->req);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+static void on_update_error(db_query_param_t *param, const char *error_string)
|
|
|
|
+{
|
|
|
|
+ const update_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(update_param_t,
|
|
|
|
+ param,
|
|
|
|
+ param);
|
|
|
|
+ update_ctx_t * const update_ctx = query_param->ctx;
|
|
|
|
+
|
|
|
|
+ if (update_ctx->gen) {
|
|
|
|
+ yajl_gen_free(update_ctx->gen);
|
|
|
|
+ update_ctx->gen = NULL;
|
|
|
|
+ send_error(BAD_GATEWAY, error_string, update_ctx->req);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ update_ctx->num_query_in_progress--;
|
|
|
|
+ h2o_mem_release_shared(update_ctx);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+static result_return_t on_update_result(db_query_param_t *param, PGresult *result)
|
|
|
|
+{
|
|
|
|
+ update_param_t * const update_param = H2O_STRUCT_FROM_MEMBER(update_param_t,
|
|
|
|
+ param,
|
|
|
|
+ param);
|
|
|
|
+ update_ctx_t * const update_ctx = update_param->ctx;
|
|
|
|
+ result_return_t ret = DONE;
|
|
|
|
+
|
|
|
|
+ if (update_ctx->gen) {
|
|
|
|
+ if (update_param->update) {
|
|
|
|
+ if (PQresultStatus(result) == PGRES_COMMAND_OK) {
|
|
|
|
+ thread_context_t * const ctx =
|
|
|
|
+ H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
|
+ event_loop.h2o_ctx,
|
|
|
|
+ update_ctx->req->conn->ctx);
|
|
|
|
+ const size_t num_query_remaining =
|
|
|
|
+ update_ctx->num_query - update_ctx->num_result;
|
|
|
|
+
|
|
|
|
+ update_ctx->num_query_in_progress--;
|
|
|
|
+
|
|
|
|
+ if (update_ctx->num_query_in_progress < num_query_remaining) {
|
|
|
|
+ const size_t idx = update_ctx->num_result +
|
|
|
|
+ update_ctx->num_query_in_progress;
|
|
|
|
+
|
|
|
|
+ update_param->random_number =
|
|
|
|
+ get_random_number(MAX_ID, &ctx->random_seed) + 1;
|
|
|
|
+ update_param->update = false;
|
|
|
|
+ snprintf(update_param->command,
|
|
|
|
+ MAX_UPDATE_QUERY_LEN,
|
|
|
|
+ UPDATE_QUERY,
|
|
|
|
+ update_ctx->res[idx].id,
|
|
|
|
+ update_ctx->res[idx].id,
|
|
|
|
+ update_param->random_number);
|
|
|
|
+
|
|
|
|
+ if (!execute_query(ctx, &update_param->param)) {
|
|
|
|
+ update_ctx->num_query_in_progress++;
|
|
|
|
+ PQclear(result);
|
|
|
|
+ return DONE;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ yajl_gen_free(update_ctx->gen);
|
|
|
|
+ update_ctx->gen = NULL;
|
|
|
|
+ send_service_unavailable_error(DB_REQ_ERROR,
|
|
|
|
+ update_ctx->req);
|
|
|
|
+ }
|
|
|
|
+ else if (update_ctx->num_result == update_ctx->num_query)
|
|
|
|
+ serialize_items(update_ctx->res,
|
|
|
|
+ update_ctx->num_result,
|
|
|
|
+ update_ctx->gen,
|
|
|
|
+ update_ctx->req);
|
|
|
|
+
|
|
|
|
+ h2o_mem_release_shared(update_ctx);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ on_update_error(param, PQresultErrorMessage(result));
|
|
|
|
+ }
|
|
|
|
+ else if (PQresultStatus(result) == PGRES_TUPLES_OK) {
|
|
|
|
+ process_result(result, update_ctx->res + update_ctx->num_result);
|
|
|
|
+ update_ctx->res[update_ctx->num_result].random_number =
|
|
|
|
+ update_param->random_number;
|
|
|
|
+ update_ctx->num_result++;
|
|
|
|
+ update_param->update = true;
|
|
|
|
+ ret = SUCCESS;
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ on_update_error(param, PQresultErrorMessage(result));
|
|
|
|
+ }
|
|
|
|
+ else {
|
|
|
|
+ update_ctx->num_query_in_progress--;
|
|
|
|
+ h2o_mem_release_shared(update_ctx);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ PQclear(result);
|
|
|
|
+ return ret;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+static void on_update_timeout(db_query_param_t *param)
|
|
|
|
+{
|
|
|
|
+ const update_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(update_param_t,
|
|
|
|
+ param,
|
|
|
|
+ param);
|
|
|
|
+ update_ctx_t * const update_ctx = query_param->ctx;
|
|
|
|
+
|
|
|
|
+ if (update_ctx->gen) {
|
|
|
|
+ yajl_gen_free(update_ctx->gen);
|
|
|
|
+ update_ctx->gen = NULL;
|
|
|
|
+ send_error(GATEWAY_TIMEOUT, DB_TIMEOUT_ERROR, update_ctx->req);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ update_ctx->num_query_in_progress--;
|
|
|
|
+ h2o_mem_release_shared(update_ctx);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+static void process_result(PGresult *result, query_result_t *out)
|
|
|
|
+{
|
|
|
|
+ assert(PQnfields(result) == 2);
|
|
|
|
+ assert(PQntuples(result) == 1);
|
|
|
|
+
|
|
|
|
+ const char * const id = PQgetvalue(result, 0, 0);
|
|
|
|
+ const char * const random_number = PQgetvalue(result, 0, 1);
|
|
|
|
+
|
|
|
|
+ if (PQfformat(result, 0)) {
|
|
|
|
+ assert(PQgetlength(result, 0, 0) == sizeof(out->id));
|
|
|
|
+ // Use memcpy() in case the result is not aligned; the reason we are
|
|
|
|
+ // copying over the id is because the results may arrive in any order.
|
|
|
|
+ memcpy(&out->id, id, sizeof(out->id));
|
|
|
|
+ out->id = ntohl(out->id);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ out->id = atoi(id);
|
|
|
|
+
|
|
|
|
+ if (PQfformat(result, 1)) {
|
|
|
|
+ assert(PQgetlength(result, 0, 1) == sizeof(out->random_number));
|
|
|
|
+ // Use memcpy() in case the result is not aligned.
|
|
|
|
+ memcpy(&out->random_number, random_number, sizeof(out->random_number));
|
|
|
|
+ out->random_number = ntohl(out->random_number);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ out->random_number = atoi(random_number);
|
|
|
|
+}
|
|
|
|
+
|
|
static int serialize_item(uint32_t id, uint32_t random_number, yajl_gen gen)
|
|
static int serialize_item(uint32_t id, uint32_t random_number, yajl_gen gen)
|
|
{
|
|
{
|
|
CHECK_YAJL_STATUS(yajl_gen_map_open, gen);
|
|
CHECK_YAJL_STATUS(yajl_gen_map_open, gen);
|
|
@@ -284,29 +488,14 @@ int multiple_queries(struct st_h2o_handler_t *self, h2o_req_t *req)
|
|
thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
event_loop.h2o_ctx,
|
|
event_loop.h2o_ctx,
|
|
req->conn->ctx);
|
|
req->conn->ctx);
|
|
- int num_query = 0;
|
|
|
|
-
|
|
|
|
- if (req->query_at < SIZE_MAX) {
|
|
|
|
- const char * const n = get_query_param(req->path.base + req->query_at + 1,
|
|
|
|
- req->path.len - req->query_at - 1,
|
|
|
|
- QUERIES_PARAMETER,
|
|
|
|
- sizeof(QUERIES_PARAMETER) - 1);
|
|
|
|
-
|
|
|
|
- if (n)
|
|
|
|
- num_query = atoi(n);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (num_query < 1)
|
|
|
|
- num_query = 1;
|
|
|
|
- else if (num_query > MAX_QUERIES)
|
|
|
|
- num_query = MAX_QUERIES;
|
|
|
|
|
|
|
|
|
|
+ const size_t num_query = get_query_number(req);
|
|
size_t base_size = offsetof(multiple_query_ctx_t, res) + num_query * sizeof(query_result_t);
|
|
size_t base_size = offsetof(multiple_query_ctx_t, res) + num_query * sizeof(query_result_t);
|
|
|
|
|
|
base_size = ((base_size + _Alignof(query_param_t) - 1) / _Alignof(query_param_t));
|
|
base_size = ((base_size + _Alignof(query_param_t) - 1) / _Alignof(query_param_t));
|
|
base_size = base_size * _Alignof(query_param_t);
|
|
base_size = base_size * _Alignof(query_param_t);
|
|
|
|
|
|
- const size_t num_query_in_progress = MIN((size_t) num_query, ctx->config->max_db_conn_num);
|
|
|
|
|
|
+ const size_t num_query_in_progress = MIN(num_query, ctx->config->max_db_conn_num);
|
|
const size_t sz = base_size + num_query_in_progress * sizeof(query_param_t);
|
|
const size_t sz = base_size + num_query_in_progress * sizeof(query_param_t);
|
|
|
|
|
|
multiple_query_ctx_t * const query_ctx = h2o_mem_alloc_shared(&req->pool,
|
|
multiple_query_ctx_t * const query_ctx = h2o_mem_alloc_shared(&req->pool,
|
|
@@ -314,26 +503,12 @@ int multiple_queries(struct st_h2o_handler_t *self, h2o_req_t *req)
|
|
cleanup_multiple_query);
|
|
cleanup_multiple_query);
|
|
|
|
|
|
if (query_ctx) {
|
|
if (query_ctx) {
|
|
- // MAX_ID is a relatively small number, so allocate on the stack.
|
|
|
|
- DEFINE_BITSET(bitset, MAX_ID);
|
|
|
|
-
|
|
|
|
memset(query_ctx, 0, sz);
|
|
memset(query_ctx, 0, sz);
|
|
query_ctx->num_query = num_query;
|
|
query_ctx->num_query = num_query;
|
|
query_ctx->num_query_in_progress = num_query_in_progress;
|
|
query_ctx->num_query_in_progress = num_query_in_progress;
|
|
query_ctx->req = req;
|
|
query_ctx->req = req;
|
|
query_ctx->query_param = (query_param_t *) ((char *) query_ctx + base_size);
|
|
query_ctx->query_param = (query_param_t *) ((char *) query_ctx + base_size);
|
|
-
|
|
|
|
- size_t max_rand = MAX_ID - query_ctx->num_query + 1;
|
|
|
|
-
|
|
|
|
- for (size_t i = 0; i < query_ctx->num_query; i++) {
|
|
|
|
- query_ctx->res[i].id = get_random_number(max_rand, &ctx->random_seed);
|
|
|
|
-
|
|
|
|
- if (BITSET_ISSET(query_ctx->res[i].id, bitset))
|
|
|
|
- query_ctx->res[i].id = max_rand - 1;
|
|
|
|
-
|
|
|
|
- BITSET_SET(query_ctx->res[i].id++, bitset);
|
|
|
|
- max_rand++;
|
|
|
|
- }
|
|
|
|
|
|
+ initialize_ids(num_query, query_ctx->res, &ctx->random_seed);
|
|
|
|
|
|
for (size_t i = 0; i < query_ctx->num_query_in_progress; i++) {
|
|
for (size_t i = 0; i < query_ctx->num_query_in_progress; i++) {
|
|
query_ctx->query_param[i].ctx = query_ctx;
|
|
query_ctx->query_param[i].ctx = query_ctx;
|
|
@@ -341,16 +516,20 @@ int multiple_queries(struct st_h2o_handler_t *self, h2o_req_t *req)
|
|
// by a completed query.
|
|
// by a completed query.
|
|
query_ctx->query_param[i].id = htonl(query_ctx->res[i].id);
|
|
query_ctx->query_param[i].id = htonl(query_ctx->res[i].id);
|
|
query_ctx->query_param[i].id_format = 1;
|
|
query_ctx->query_param[i].id_format = 1;
|
|
- query_ctx->query_param[i].id_len = sizeof(query_ctx->res[i].id);
|
|
|
|
- query_ctx->query_param[i].id_pointer = (const char *) &query_ctx->query_param[i].id;
|
|
|
|
|
|
+ query_ctx->query_param[i].id_len = sizeof(query_ctx->query_param[i].id);
|
|
|
|
+ query_ctx->query_param[i].id_pointer =
|
|
|
|
+ (const char *) &query_ctx->query_param[i].id;
|
|
query_ctx->query_param[i].param.command = WORLD_TABLE_NAME;
|
|
query_ctx->query_param[i].param.command = WORLD_TABLE_NAME;
|
|
query_ctx->query_param[i].param.nParams = 1;
|
|
query_ctx->query_param[i].param.nParams = 1;
|
|
query_ctx->query_param[i].param.on_error = on_multiple_query_error;
|
|
query_ctx->query_param[i].param.on_error = on_multiple_query_error;
|
|
query_ctx->query_param[i].param.on_result = on_multiple_query_result;
|
|
query_ctx->query_param[i].param.on_result = on_multiple_query_result;
|
|
query_ctx->query_param[i].param.on_timeout = on_multiple_query_timeout;
|
|
query_ctx->query_param[i].param.on_timeout = on_multiple_query_timeout;
|
|
- query_ctx->query_param[i].param.paramFormats = &query_ctx->query_param[i].id_format;
|
|
|
|
- query_ctx->query_param[i].param.paramLengths = &query_ctx->query_param[i].id_len;
|
|
|
|
- query_ctx->query_param[i].param.paramValues = &query_ctx->query_param[i].id_pointer;
|
|
|
|
|
|
+ query_ctx->query_param[i].param.paramFormats =
|
|
|
|
+ &query_ctx->query_param[i].id_format;
|
|
|
|
+ query_ctx->query_param[i].param.paramLengths =
|
|
|
|
+ &query_ctx->query_param[i].id_len;
|
|
|
|
+ query_ctx->query_param[i].param.paramValues =
|
|
|
|
+ &query_ctx->query_param[i].id_pointer;
|
|
query_ctx->query_param[i].param.flags = IS_PREPARED;
|
|
query_ctx->query_param[i].param.flags = IS_PREPARED;
|
|
query_ctx->query_param[i].param.resultFormat = 1;
|
|
query_ctx->query_param[i].param.resultFormat = 1;
|
|
|
|
|
|
@@ -413,7 +592,69 @@ int single_query(struct st_h2o_handler_t *self, h2o_req_t *req)
|
|
|
|
|
|
int updates(struct st_h2o_handler_t *self, h2o_req_t *req)
|
|
int updates(struct st_h2o_handler_t *self, h2o_req_t *req)
|
|
{
|
|
{
|
|
- IGNORE_FUNCTION_PARAMETER(req);
|
|
|
|
IGNORE_FUNCTION_PARAMETER(self);
|
|
IGNORE_FUNCTION_PARAMETER(self);
|
|
- return -1;
|
|
|
|
|
|
+
|
|
|
|
+ thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
|
+ event_loop.h2o_ctx,
|
|
|
|
+ req->conn->ctx);
|
|
|
|
+
|
|
|
|
+ const size_t num_query = get_query_number(req);
|
|
|
|
+ size_t base_size = offsetof(update_ctx_t, res) + num_query * sizeof(query_result_t);
|
|
|
|
+
|
|
|
|
+ base_size = ((base_size + _Alignof(update_param_t) - 1) / _Alignof(update_param_t));
|
|
|
|
+ base_size = base_size * _Alignof(update_param_t);
|
|
|
|
+
|
|
|
|
+ const size_t num_query_in_progress = MIN(num_query, ctx->config->max_db_conn_num);
|
|
|
|
+ const size_t struct_size = base_size + num_query_in_progress * sizeof(update_param_t);
|
|
|
|
+ const size_t sz = struct_size + num_query_in_progress * MAX_UPDATE_QUERY_LEN;
|
|
|
|
+ update_ctx_t * const update_ctx = h2o_mem_alloc_shared(&req->pool, sz, cleanup_update);
|
|
|
|
+
|
|
|
|
+ if (update_ctx) {
|
|
|
|
+ memset(update_ctx, 0, struct_size);
|
|
|
|
+ update_ctx->num_query = num_query;
|
|
|
|
+ update_ctx->num_query_in_progress = num_query_in_progress;
|
|
|
|
+ update_ctx->req = req;
|
|
|
|
+ update_ctx->update_param = (update_param_t *) ((char *) update_ctx + base_size);
|
|
|
|
+ initialize_ids(num_query, update_ctx->res, &ctx->random_seed);
|
|
|
|
+
|
|
|
|
+ char *command = (char *) update_ctx + struct_size;
|
|
|
|
+
|
|
|
|
+ for (size_t i = 0; i < update_ctx->num_query_in_progress; i++) {
|
|
|
|
+ update_ctx->update_param[i].command = command;
|
|
|
|
+ command += MAX_UPDATE_QUERY_LEN;
|
|
|
|
+ update_ctx->update_param[i].ctx = update_ctx;
|
|
|
|
+ update_ctx->update_param[i].param.command =
|
|
|
|
+ update_ctx->update_param[i].command;
|
|
|
|
+ update_ctx->update_param[i].param.on_error = on_update_error;
|
|
|
|
+ update_ctx->update_param[i].param.on_result = on_update_result;
|
|
|
|
+ update_ctx->update_param[i].param.on_timeout = on_update_timeout;
|
|
|
|
+ update_ctx->update_param[i].param.resultFormat = 1;
|
|
|
|
+ update_ctx->update_param[i].random_number =
|
|
|
|
+ get_random_number(MAX_ID, &ctx->random_seed) + 1;
|
|
|
|
+ snprintf(update_ctx->update_param[i].command,
|
|
|
|
+ MAX_UPDATE_QUERY_LEN,
|
|
|
|
+ UPDATE_QUERY,
|
|
|
|
+ update_ctx->res[i].id,
|
|
|
|
+ update_ctx->res[i].id,
|
|
|
|
+ update_ctx->update_param[i].random_number);
|
|
|
|
+
|
|
|
|
+ if (execute_query(ctx, &update_ctx->update_param[i].param)) {
|
|
|
|
+ update_ctx->num_query_in_progress = i;
|
|
|
|
+ send_service_unavailable_error(DB_REQ_ERROR, req);
|
|
|
|
+ return 0;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ h2o_mem_addref_shared(update_ctx);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Create a JSON generator while the queries are processed.
|
|
|
|
+ update_ctx->gen = get_json_generator(&req->pool);
|
|
|
|
+
|
|
|
|
+ if (!update_ctx->gen)
|
|
|
|
+ send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
|
|
|
|
+
|
|
|
|
+ return 0;
|
|
}
|
|
}
|