|
@@ -19,8 +19,10 @@
|
|
|
|
|
|
#include <assert.h>
|
|
|
#include <h2o.h>
|
|
|
+#include <stdbool.h>
|
|
|
#include <stddef.h>
|
|
|
#include <stdint.h>
|
|
|
+#include <stdio.h>
|
|
|
#include <arpa/inet.h>
|
|
|
#include <postgresql/libpq-fe.h>
|
|
|
#include <yajl/yajl_gen.h>
|
|
@@ -38,13 +40,8 @@
|
|
|
#define QUERIES_PARAMETER "queries="
|
|
|
#define RANDOM_NUM_KEY "randomNumber"
|
|
|
|
|
|
-typedef enum {
|
|
|
- NO_UPDATE = 0,
|
|
|
- CREATE,
|
|
|
- COPY_1,
|
|
|
- COPY_2,
|
|
|
- UPDATE
|
|
|
-} update_state_t;
|
|
|
+typedef struct multiple_query_ctx_t multiple_query_ctx_t;
|
|
|
+typedef struct update_ctx_t update_ctx_t;
|
|
|
|
|
|
typedef struct {
|
|
|
uint32_t id;
|
|
@@ -52,41 +49,72 @@ typedef struct {
|
|
|
} query_result_t;
|
|
|
|
|
|
typedef struct {
|
|
|
+ multiple_query_ctx_t *ctx;
|
|
|
+ const char *id_pointer;
|
|
|
+ uint32_t id;
|
|
|
+ int id_format;
|
|
|
+ int id_len;
|
|
|
db_query_param_t param;
|
|
|
+} query_param_t;
|
|
|
+
|
|
|
+typedef struct {
|
|
|
const char *id_pointer;
|
|
|
h2o_req_t *req;
|
|
|
uint32_t id;
|
|
|
int id_format;
|
|
|
int id_len;
|
|
|
+ db_query_param_t param;
|
|
|
} single_query_ctx_t;
|
|
|
|
|
|
+struct multiple_query_ctx_t {
|
|
|
+ yajl_gen gen; // also an error flag
|
|
|
+ h2o_req_t *req;
|
|
|
+ query_param_t *query_param;
|
|
|
+ size_t num_query;
|
|
|
+ size_t num_query_in_progress;
|
|
|
+ size_t num_result;
|
|
|
+ query_result_t res[];
|
|
|
+};
|
|
|
+
|
|
|
typedef struct {
|
|
|
- single_query_ctx_t single;
|
|
|
- yajl_gen gen;
|
|
|
+ char *command;
|
|
|
+ update_ctx_t *ctx;
|
|
|
+ uint32_t random_number;
|
|
|
+ bool update;
|
|
|
+ db_query_param_t param;
|
|
|
+} update_param_t;
|
|
|
+
|
|
|
+struct update_ctx_t {
|
|
|
+ yajl_gen gen; // also an error flag
|
|
|
+ h2o_req_t *req;
|
|
|
+ update_param_t *update_param;
|
|
|
size_t num_query;
|
|
|
+ size_t num_query_in_progress;
|
|
|
size_t num_result;
|
|
|
- update_state_t update_state;
|
|
|
query_result_t res[];
|
|
|
-} multiple_query_ctx_t;
|
|
|
-
|
|
|
-static void cleanup_request(void *data);
|
|
|
-static int do_multiple_queries(update_state_t update_state, h2o_req_t *req);
|
|
|
-static void initialize_single_query_context(h2o_req_t *req,
|
|
|
- on_result_t on_result,
|
|
|
- single_query_ctx_t *query_ctx);
|
|
|
-static void on_database_error(db_query_param_t *param, const char *error_string);
|
|
|
-static void on_database_timeout(db_query_param_t *param);
|
|
|
+};
|
|
|
+
|
|
|
+static void cleanup_multiple_query(void *data);
|
|
|
+static void cleanup_update(void *data);
|
|
|
+static size_t get_query_number(h2o_req_t *req);
|
|
|
+static void initialize_ids(size_t num_query, query_result_t *res, unsigned int *seed);
|
|
|
+static void on_multiple_query_error(db_query_param_t *param, const char *error_string);
|
|
|
static result_return_t on_multiple_query_result(db_query_param_t *param, PGresult *result);
|
|
|
+static void on_multiple_query_timeout(db_query_param_t *param);
|
|
|
+static void on_single_query_error(db_query_param_t *param, const char *error_string);
|
|
|
static result_return_t on_single_query_result(db_query_param_t *param, PGresult *result);
|
|
|
+static void on_single_query_timeout(db_query_param_t *param);
|
|
|
+static void on_update_error(db_query_param_t *param, const char *error_string);
|
|
|
static result_return_t on_update_result(db_query_param_t *param, PGresult *result);
|
|
|
-static int on_update_write_ready(db_query_param_t *param, PGconn *db_conn);
|
|
|
+static void on_update_timeout(db_query_param_t *param);
|
|
|
+static void process_result(PGresult *result, query_result_t *out);
|
|
|
static int serialize_item(uint32_t id, uint32_t random_number, yajl_gen gen);
|
|
|
static void serialize_items(const query_result_t *res,
|
|
|
size_t num_result,
|
|
|
yajl_gen gen,
|
|
|
h2o_req_t *req);
|
|
|
|
|
|
-static void cleanup_request(void *data)
|
|
|
+static void cleanup_multiple_query(void *data)
|
|
|
{
|
|
|
const multiple_query_ctx_t * const query_ctx = data;
|
|
|
|
|
@@ -94,11 +122,16 @@ static void cleanup_request(void *data)
|
|
|
yajl_gen_free(query_ctx->gen);
|
|
|
}
|
|
|
|
|
|
-static int do_multiple_queries(update_state_t update_state, h2o_req_t *req)
|
|
|
+static void cleanup_update(void *data)
|
|
|
+{
|
|
|
+ const update_ctx_t * const update_ctx = data;
|
|
|
+
|
|
|
+ if (update_ctx->gen)
|
|
|
+ yajl_gen_free(update_ctx->gen);
|
|
|
+}
|
|
|
+
|
|
|
+static size_t get_query_number(h2o_req_t *req)
|
|
|
{
|
|
|
- thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
- event_loop.h2o_ctx,
|
|
|
- req->conn->ctx);
|
|
|
int num_query = 0;
|
|
|
|
|
|
if (req->query_at < SIZE_MAX) {
|
|
@@ -116,144 +149,118 @@ static int do_multiple_queries(update_state_t update_state, h2o_req_t *req)
|
|
|
else if (num_query > MAX_QUERIES)
|
|
|
num_query = MAX_QUERIES;
|
|
|
|
|
|
- const size_t sz = offsetof(multiple_query_ctx_t, res) + num_query * sizeof(query_result_t);
|
|
|
- multiple_query_ctx_t * const query_ctx = h2o_mem_alloc_shared(&req->pool, sz, cleanup_request);
|
|
|
-
|
|
|
- if (query_ctx) {
|
|
|
- // MAX_ID is a relatively small number, so allocate on the stack.
|
|
|
- DEFINE_BITSET(bitset, MAX_ID);
|
|
|
-
|
|
|
- initialize_single_query_context(req, on_multiple_query_result, &query_ctx->single);
|
|
|
- memset(&query_ctx->single + 1,
|
|
|
- 0,
|
|
|
- offsetof(multiple_query_ctx_t, res) - sizeof(query_ctx->single));
|
|
|
- query_ctx->num_query = num_query;
|
|
|
- query_ctx->update_state = update_state;
|
|
|
-
|
|
|
- size_t max_rand = MAX_ID - query_ctx->num_query + 1;
|
|
|
+ return num_query;
|
|
|
+}
|
|
|
|
|
|
- for (size_t i = 0; i < query_ctx->num_query; i++) {
|
|
|
- query_ctx->res[i].id = get_random_number(max_rand, &ctx->random_seed);
|
|
|
+static void initialize_ids(size_t num_query, query_result_t *res, unsigned int *seed)
|
|
|
+{
|
|
|
+ // MAX_ID is a relatively small number, so allocate on the stack.
|
|
|
+ DEFINE_BITSET(bitset, MAX_ID);
|
|
|
|
|
|
- if (BITSET_ISSET(query_ctx->res[i].id, bitset))
|
|
|
- query_ctx->res[i].id = max_rand - 1;
|
|
|
+ size_t max_rand = MAX_ID - num_query + 1;
|
|
|
|
|
|
- BITSET_SET(query_ctx->res[i].id++, bitset);
|
|
|
- max_rand++;
|
|
|
- }
|
|
|
+ for (size_t i = 0; i < num_query; i++) {
|
|
|
+ res[i].id = get_random_number(max_rand, seed);
|
|
|
|
|
|
- query_ctx->single.id = htonl(query_ctx->res->id);
|
|
|
+ if (BITSET_ISSET(res[i].id, bitset))
|
|
|
+ res[i].id = max_rand - 1;
|
|
|
|
|
|
- if (execute_query(ctx, &query_ctx->single.param))
|
|
|
- send_service_unavailable_error(DB_REQ_ERROR, req);
|
|
|
- else
|
|
|
- // Create a JSON generator while the query is processed.
|
|
|
- query_ctx->gen = get_json_generator(&req->pool);
|
|
|
+ BITSET_SET(res[i].id++, bitset);
|
|
|
+ max_rand++;
|
|
|
}
|
|
|
- else
|
|
|
- send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
|
|
|
-
|
|
|
- return 0;
|
|
|
}
|
|
|
|
|
|
-static void initialize_single_query_context(h2o_req_t *req,
|
|
|
- on_result_t on_result,
|
|
|
- single_query_ctx_t *query_ctx)
|
|
|
+static void on_multiple_query_error(db_query_param_t *param, const char *error_string)
|
|
|
{
|
|
|
- memset(query_ctx, 0, sizeof(*query_ctx));
|
|
|
- query_ctx->id_format = 1;
|
|
|
- query_ctx->id_len = sizeof(query_ctx->id);
|
|
|
- query_ctx->id_pointer = (const char *) &query_ctx->id;
|
|
|
- query_ctx->param.command = WORLD_TABLE_NAME;
|
|
|
- query_ctx->param.nParams = 1;
|
|
|
- query_ctx->param.on_error = on_database_error;
|
|
|
- query_ctx->param.on_result = on_result;
|
|
|
- query_ctx->param.on_timeout = on_database_timeout;
|
|
|
- query_ctx->param.paramFormats = &query_ctx->id_format;
|
|
|
- query_ctx->param.paramLengths = &query_ctx->id_len;
|
|
|
- query_ctx->param.paramValues = &query_ctx->id_pointer;
|
|
|
- query_ctx->param.flags = IS_PREPARED;
|
|
|
- query_ctx->param.resultFormat = 1;
|
|
|
- query_ctx->req = req;
|
|
|
-}
|
|
|
+ const query_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(query_param_t,
|
|
|
+ param,
|
|
|
+ param);
|
|
|
+ multiple_query_ctx_t * const query_ctx = query_param->ctx;
|
|
|
|
|
|
-static void on_database_error(db_query_param_t *param, const char *error_string)
|
|
|
-{
|
|
|
- single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t,
|
|
|
- param,
|
|
|
- param);
|
|
|
-
|
|
|
- send_error(BAD_GATEWAY, error_string, query_ctx->req);
|
|
|
-}
|
|
|
-
|
|
|
-static void on_database_timeout(db_query_param_t *param)
|
|
|
-{
|
|
|
- single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t,
|
|
|
- param,
|
|
|
- param);
|
|
|
+ if (query_ctx->gen) {
|
|
|
+ yajl_gen_free(query_ctx->gen);
|
|
|
+ query_ctx->gen = NULL;
|
|
|
+ send_error(BAD_GATEWAY, error_string, query_ctx->req);
|
|
|
+ }
|
|
|
|
|
|
- send_error(GATEWAY_TIMEOUT, DB_TIMEOUT_ERROR, query_ctx->req);
|
|
|
+ query_ctx->num_query_in_progress--;
|
|
|
+ h2o_mem_release_shared(query_ctx);
|
|
|
}
|
|
|
|
|
|
static result_return_t on_multiple_query_result(db_query_param_t *param, PGresult *result)
|
|
|
{
|
|
|
- multiple_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(multiple_query_ctx_t,
|
|
|
- single.param,
|
|
|
- param);
|
|
|
+ query_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(query_param_t,
|
|
|
+ param,
|
|
|
+ param);
|
|
|
+ multiple_query_ctx_t * const query_ctx = query_param->ctx;
|
|
|
|
|
|
- if (PQresultStatus(result) == PGRES_TUPLES_OK) {
|
|
|
+ if (query_ctx->gen && PQresultStatus(result) == PGRES_TUPLES_OK) {
|
|
|
thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
event_loop.h2o_ctx,
|
|
|
- query_ctx->single.req->conn->ctx);
|
|
|
- uint32_t * const random_number = &query_ctx->res[query_ctx->num_result++].random_number;
|
|
|
+ query_ctx->req->conn->ctx);
|
|
|
|
|
|
- assert(PQnfields(result) == 2);
|
|
|
- assert(PQntuples(result) == 1);
|
|
|
- assert(PQgetlength(result, 0, 1) == sizeof(*random_number));
|
|
|
- // Use memcpy() in case the result is not aligned.
|
|
|
- memcpy(random_number, PQgetvalue(result, 0, 1), sizeof(*random_number));
|
|
|
- *random_number = ntohl(*random_number);
|
|
|
- PQclear(result);
|
|
|
+ process_result(result, query_ctx->res + query_ctx->num_result);
|
|
|
+ query_ctx->num_query_in_progress--;
|
|
|
+ query_ctx->num_result++;
|
|
|
+
|
|
|
+ const size_t num_query_remaining = query_ctx->num_query - query_ctx->num_result;
|
|
|
+
|
|
|
+ if (query_ctx->num_query_in_progress < num_query_remaining) {
|
|
|
+ const size_t idx = query_ctx->num_result + query_ctx->num_query_in_progress;
|
|
|
|
|
|
- if (query_ctx->num_result < query_ctx->num_query) {
|
|
|
- query_ctx->single.id = htonl(query_ctx->res[query_ctx->num_result].id);
|
|
|
+ query_param->id = htonl(query_ctx->res[idx].id);
|
|
|
|
|
|
- if (!execute_query(ctx, &query_ctx->single.param))
|
|
|
+ if (!execute_query(ctx, &query_param->param)) {
|
|
|
+ query_ctx->num_query_in_progress++;
|
|
|
+ PQclear(result);
|
|
|
return DONE;
|
|
|
+ }
|
|
|
|
|
|
- send_service_unavailable_error(DB_REQ_ERROR, query_ctx->single.req);
|
|
|
+ yajl_gen_free(query_ctx->gen);
|
|
|
+ query_ctx->gen = NULL;
|
|
|
+ send_service_unavailable_error(DB_REQ_ERROR, query_ctx->req);
|
|
|
}
|
|
|
- else if (query_ctx->update_state == NO_UPDATE) {
|
|
|
+ else if (query_ctx->num_result == query_ctx->num_query)
|
|
|
serialize_items(query_ctx->res,
|
|
|
query_ctx->num_result,
|
|
|
query_ctx->gen,
|
|
|
- query_ctx->single.req);
|
|
|
- return DONE;
|
|
|
- }
|
|
|
- else {
|
|
|
- query_ctx->single.param.command = UPDATE_QUERY;
|
|
|
- query_ctx->single.param.nParams = 0;
|
|
|
- query_ctx->single.param.on_result = on_update_result;
|
|
|
- query_ctx->single.param.on_write_ready = on_update_write_ready;
|
|
|
- query_ctx->single.param.paramFormats = NULL;
|
|
|
- query_ctx->single.param.paramLengths = NULL;
|
|
|
- query_ctx->single.param.paramValues = NULL;
|
|
|
- query_ctx->single.param.flags = 0;
|
|
|
-
|
|
|
- if (!execute_query(ctx, &query_ctx->single.param))
|
|
|
- return DONE;
|
|
|
+ query_ctx->req);
|
|
|
|
|
|
- send_service_unavailable_error(DB_REQ_ERROR, query_ctx->single.req);
|
|
|
- }
|
|
|
- }
|
|
|
- else {
|
|
|
- send_error(BAD_GATEWAY, PQresultErrorMessage(result), query_ctx->single.req);
|
|
|
- PQclear(result);
|
|
|
+ h2o_mem_release_shared(query_ctx);
|
|
|
}
|
|
|
+ else
|
|
|
+ on_multiple_query_error(param, PQresultErrorMessage(result));
|
|
|
|
|
|
+ PQclear(result);
|
|
|
return DONE;
|
|
|
}
|
|
|
|
|
|
+static void on_multiple_query_timeout(db_query_param_t *param)
|
|
|
+{
|
|
|
+ const query_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(query_param_t,
|
|
|
+ param,
|
|
|
+ param);
|
|
|
+ multiple_query_ctx_t * const query_ctx = query_param->ctx;
|
|
|
+
|
|
|
+ if (query_ctx->gen) {
|
|
|
+ yajl_gen_free(query_ctx->gen);
|
|
|
+ query_ctx->gen = NULL;
|
|
|
+ send_error(GATEWAY_TIMEOUT, DB_TIMEOUT_ERROR, query_ctx->req);
|
|
|
+ }
|
|
|
+
|
|
|
+ query_ctx->num_query_in_progress--;
|
|
|
+ h2o_mem_release_shared(query_ctx);
|
|
|
+}
|
|
|
+
|
|
|
+static void on_single_query_error(db_query_param_t *param, const char *error_string)
|
|
|
+{
|
|
|
+ single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t,
|
|
|
+ param,
|
|
|
+ param);
|
|
|
+
|
|
|
+ send_error(BAD_GATEWAY, error_string, query_ctx->req);
|
|
|
+}
|
|
|
+
|
|
|
static result_return_t on_single_query_result(db_query_param_t *param, PGresult *result)
|
|
|
{
|
|
|
single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t,
|
|
@@ -294,114 +301,151 @@ static result_return_t on_single_query_result(db_query_param_t *param, PGresult
|
|
|
return DONE;
|
|
|
}
|
|
|
|
|
|
-static result_return_t on_update_result(db_query_param_t *param, PGresult *result)
|
|
|
+static void on_single_query_timeout(db_query_param_t *param)
|
|
|
{
|
|
|
- multiple_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(multiple_query_ctx_t,
|
|
|
- single.param,
|
|
|
- param);
|
|
|
- result_return_t ret = SUCCESS;
|
|
|
- const ExecStatusType status = PQresultStatus(result);
|
|
|
-
|
|
|
- switch (query_ctx->update_state) {
|
|
|
- case CREATE:
|
|
|
- case COPY_2:
|
|
|
- if (status != PGRES_COMMAND_OK)
|
|
|
- goto error;
|
|
|
-
|
|
|
- query_ctx->update_state++;
|
|
|
- break;
|
|
|
- case COPY_1:
|
|
|
- if (status != PGRES_COPY_IN)
|
|
|
- goto error;
|
|
|
-
|
|
|
- ret = WANT_WRITE;
|
|
|
- break;
|
|
|
- case UPDATE:
|
|
|
- if (status != PGRES_COMMAND_OK)
|
|
|
- goto error;
|
|
|
-
|
|
|
- serialize_items(query_ctx->res,
|
|
|
- query_ctx->num_result,
|
|
|
- query_ctx->gen,
|
|
|
- query_ctx->single.req);
|
|
|
- ret = DONE;
|
|
|
- break;
|
|
|
- default:
|
|
|
- goto error;
|
|
|
- }
|
|
|
+ single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t,
|
|
|
+ param,
|
|
|
+ param);
|
|
|
|
|
|
- PQclear(result);
|
|
|
- return ret;
|
|
|
-error:
|
|
|
- send_error(BAD_GATEWAY, PQresultErrorMessage(result), query_ctx->single.req);
|
|
|
- PQclear(result);
|
|
|
- return DONE;
|
|
|
+ send_error(GATEWAY_TIMEOUT, DB_TIMEOUT_ERROR, query_ctx->req);
|
|
|
}
|
|
|
|
|
|
-static int on_update_write_ready(db_query_param_t *param, PGconn *db_conn)
|
|
|
+static void on_update_error(db_query_param_t *param, const char *error_string)
|
|
|
{
|
|
|
- multiple_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(multiple_query_ctx_t,
|
|
|
- single.param,
|
|
|
- param);
|
|
|
- thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
- event_loop.h2o_ctx,
|
|
|
- query_ctx->single.req->conn->ctx);
|
|
|
-
|
|
|
- if (query_ctx->num_result == query_ctx->num_query && query_ctx->update_state != COPY_2) {
|
|
|
- const int rc = PQputCopyData(db_conn, H2O_STRLIT(COPY_HEADER));
|
|
|
-
|
|
|
- if (!rc)
|
|
|
- return 1;
|
|
|
- else if (rc < 0)
|
|
|
- return rc;
|
|
|
-
|
|
|
- query_ctx->num_result = 0;
|
|
|
- query_ctx->update_state = COPY_2;
|
|
|
+ const update_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(update_param_t,
|
|
|
+ param,
|
|
|
+ param);
|
|
|
+ update_ctx_t * const update_ctx = query_param->ctx;
|
|
|
+
|
|
|
+ if (update_ctx->gen) {
|
|
|
+ yajl_gen_free(update_ctx->gen);
|
|
|
+ update_ctx->gen = NULL;
|
|
|
+ send_error(BAD_GATEWAY, error_string, update_ctx->req);
|
|
|
}
|
|
|
|
|
|
- if (query_ctx->num_result < query_ctx->num_query) {
|
|
|
- assert(query_ctx->num_query - query_ctx->num_result <= MAX_QUERIES);
|
|
|
-
|
|
|
- // There are at most MAX_QUERIES elements, so allocate on the stack.
|
|
|
- struct __attribute__ ((__packed__)) {
|
|
|
- uint16_t field_count;
|
|
|
- uint32_t id_size;
|
|
|
- uint32_t id;
|
|
|
- uint32_t random_number_size;
|
|
|
- uint32_t random_number;
|
|
|
- } data[query_ctx->num_query - query_ctx->num_result];
|
|
|
-
|
|
|
- memset(&data, 0, sizeof(data));
|
|
|
-
|
|
|
- for (size_t i = 0; i < query_ctx->num_query; i++) {
|
|
|
- query_ctx->res[i].random_number = get_random_number(MAX_ID, &ctx->random_seed) + 1;
|
|
|
- data[i].field_count = htons(2);
|
|
|
- data[i].id_size = htonl(sizeof(data->id));
|
|
|
- data[i].id = htonl(query_ctx->res[i].id);
|
|
|
- data[i].random_number_size = htonl(sizeof(data->random_number));
|
|
|
- data[i].random_number = htonl(query_ctx->res[i].random_number);
|
|
|
- }
|
|
|
+ update_ctx->num_query_in_progress--;
|
|
|
+ h2o_mem_release_shared(update_ctx);
|
|
|
+}
|
|
|
|
|
|
- const int rc = PQputCopyData(db_conn, (const char *) &data, sizeof(data));
|
|
|
+static result_return_t on_update_result(db_query_param_t *param, PGresult *result)
|
|
|
+{
|
|
|
+ update_param_t * const update_param = H2O_STRUCT_FROM_MEMBER(update_param_t,
|
|
|
+ param,
|
|
|
+ param);
|
|
|
+ update_ctx_t * const update_ctx = update_param->ctx;
|
|
|
+ result_return_t ret = DONE;
|
|
|
+
|
|
|
+ if (update_ctx->gen) {
|
|
|
+ if (update_param->update) {
|
|
|
+ if (PQresultStatus(result) == PGRES_COMMAND_OK) {
|
|
|
+ thread_context_t * const ctx =
|
|
|
+ H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
+ event_loop.h2o_ctx,
|
|
|
+ update_ctx->req->conn->ctx);
|
|
|
+ const size_t num_query_remaining =
|
|
|
+ update_ctx->num_query - update_ctx->num_result;
|
|
|
+
|
|
|
+ update_ctx->num_query_in_progress--;
|
|
|
+
|
|
|
+ if (update_ctx->num_query_in_progress < num_query_remaining) {
|
|
|
+ const size_t idx = update_ctx->num_result +
|
|
|
+ update_ctx->num_query_in_progress;
|
|
|
+
|
|
|
+ update_param->random_number =
|
|
|
+ get_random_number(MAX_ID, &ctx->random_seed) + 1;
|
|
|
+ update_param->update = false;
|
|
|
+ snprintf(update_param->command,
|
|
|
+ MAX_UPDATE_QUERY_LEN,
|
|
|
+ UPDATE_QUERY,
|
|
|
+ update_ctx->res[idx].id,
|
|
|
+ update_ctx->res[idx].id,
|
|
|
+ update_param->random_number);
|
|
|
+
|
|
|
+ if (!execute_query(ctx, &update_param->param)) {
|
|
|
+ update_ctx->num_query_in_progress++;
|
|
|
+ PQclear(result);
|
|
|
+ return DONE;
|
|
|
+ }
|
|
|
+
|
|
|
+ yajl_gen_free(update_ctx->gen);
|
|
|
+ update_ctx->gen = NULL;
|
|
|
+ send_service_unavailable_error(DB_REQ_ERROR,
|
|
|
+ update_ctx->req);
|
|
|
+ }
|
|
|
+ else if (update_ctx->num_result == update_ctx->num_query)
|
|
|
+ serialize_items(update_ctx->res,
|
|
|
+ update_ctx->num_result,
|
|
|
+ update_ctx->gen,
|
|
|
+ update_ctx->req);
|
|
|
+
|
|
|
+ h2o_mem_release_shared(update_ctx);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ on_update_error(param, PQresultErrorMessage(result));
|
|
|
+ }
|
|
|
+ else if (PQresultStatus(result) == PGRES_TUPLES_OK) {
|
|
|
+ process_result(result, update_ctx->res + update_ctx->num_result);
|
|
|
+ update_ctx->res[update_ctx->num_result].random_number =
|
|
|
+ update_param->random_number;
|
|
|
+ update_ctx->num_result++;
|
|
|
+ update_param->update = true;
|
|
|
+ ret = SUCCESS;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ on_update_error(param, PQresultErrorMessage(result));
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ update_ctx->num_query_in_progress--;
|
|
|
+ h2o_mem_release_shared(update_ctx);
|
|
|
+ }
|
|
|
|
|
|
- if (!rc)
|
|
|
- return 1;
|
|
|
- else if (rc < 0)
|
|
|
- return rc;
|
|
|
+ PQclear(result);
|
|
|
+ return ret;
|
|
|
+}
|
|
|
|
|
|
- query_ctx->num_result = query_ctx->num_query;
|
|
|
+static void on_update_timeout(db_query_param_t *param)
|
|
|
+{
|
|
|
+ const update_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(update_param_t,
|
|
|
+ param,
|
|
|
+ param);
|
|
|
+ update_ctx_t * const update_ctx = query_param->ctx;
|
|
|
+
|
|
|
+ if (update_ctx->gen) {
|
|
|
+ yajl_gen_free(update_ctx->gen);
|
|
|
+ update_ctx->gen = NULL;
|
|
|
+ send_error(GATEWAY_TIMEOUT, DB_TIMEOUT_ERROR, update_ctx->req);
|
|
|
}
|
|
|
|
|
|
- if (query_ctx->num_result == query_ctx->num_query) {
|
|
|
- const int rc = PQputCopyEnd(db_conn, NULL);
|
|
|
+ update_ctx->num_query_in_progress--;
|
|
|
+ h2o_mem_release_shared(update_ctx);
|
|
|
+}
|
|
|
|
|
|
- if (!rc)
|
|
|
- return 1;
|
|
|
- else if (rc < 0)
|
|
|
- return rc;
|
|
|
+static void process_result(PGresult *result, query_result_t *out)
|
|
|
+{
|
|
|
+ assert(PQnfields(result) == 2);
|
|
|
+ assert(PQntuples(result) == 1);
|
|
|
+
|
|
|
+ const char * const id = PQgetvalue(result, 0, 0);
|
|
|
+ const char * const random_number = PQgetvalue(result, 0, 1);
|
|
|
+
|
|
|
+ if (PQfformat(result, 0)) {
|
|
|
+ assert(PQgetlength(result, 0, 0) == sizeof(out->id));
|
|
|
+ // Use memcpy() in case the result is not aligned; the reason we are
|
|
|
+ // copying over the id is because the results may arrive in any order.
|
|
|
+ memcpy(&out->id, id, sizeof(out->id));
|
|
|
+ out->id = ntohl(out->id);
|
|
|
}
|
|
|
+ else
|
|
|
+ out->id = atoi(id);
|
|
|
|
|
|
- return PQflush(db_conn);
|
|
|
+ if (PQfformat(result, 1)) {
|
|
|
+ assert(PQgetlength(result, 0, 1) == sizeof(out->random_number));
|
|
|
+ // Use memcpy() in case the result is not aligned.
|
|
|
+ memcpy(&out->random_number, random_number, sizeof(out->random_number));
|
|
|
+ out->random_number = ntohl(out->random_number);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ out->random_number = atoi(random_number);
|
|
|
}
|
|
|
|
|
|
static int serialize_item(uint32_t id, uint32_t random_number, yajl_gen gen)
|
|
@@ -422,22 +466,16 @@ static void serialize_items(const query_result_t *res,
|
|
|
yajl_gen gen,
|
|
|
h2o_req_t *req)
|
|
|
{
|
|
|
- // In principle the JSON generator can be created here, but we do it earlier,
|
|
|
- // so that it happens in parallel with the database query; we assume that the
|
|
|
- // allocation will rarely fail, so that the delayed error handling is not
|
|
|
- // problematic.
|
|
|
- if (gen) {
|
|
|
- CHECK_YAJL_STATUS(yajl_gen_array_open, gen);
|
|
|
+ CHECK_YAJL_STATUS(yajl_gen_array_open, gen);
|
|
|
|
|
|
- for (size_t i = 0; i < num_result; i++)
|
|
|
- if (serialize_item(res[i].id, res[i].random_number, gen))
|
|
|
- goto error_yajl;
|
|
|
+ for (size_t i = 0; i < num_result; i++)
|
|
|
+ if (serialize_item(res[i].id, res[i].random_number, gen))
|
|
|
+ goto error_yajl;
|
|
|
|
|
|
- CHECK_YAJL_STATUS(yajl_gen_array_close, gen);
|
|
|
+ CHECK_YAJL_STATUS(yajl_gen_array_close, gen);
|
|
|
|
|
|
- if (!send_json_response(gen, false, req))
|
|
|
- return;
|
|
|
- }
|
|
|
+ if (!send_json_response(gen, false, req))
|
|
|
+ return;
|
|
|
|
|
|
error_yajl:
|
|
|
send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
|
|
@@ -447,7 +485,73 @@ int multiple_queries(struct st_h2o_handler_t *self, h2o_req_t *req)
|
|
|
{
|
|
|
IGNORE_FUNCTION_PARAMETER(self);
|
|
|
|
|
|
- return do_multiple_queries(NO_UPDATE, req);
|
|
|
+ thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
+ event_loop.h2o_ctx,
|
|
|
+ req->conn->ctx);
|
|
|
+
|
|
|
+ const size_t num_query = get_query_number(req);
|
|
|
+ size_t base_size = offsetof(multiple_query_ctx_t, res) + num_query * sizeof(query_result_t);
|
|
|
+
|
|
|
+ base_size = ((base_size + _Alignof(query_param_t) - 1) / _Alignof(query_param_t));
|
|
|
+ base_size = base_size * _Alignof(query_param_t);
|
|
|
+
|
|
|
+ const size_t num_query_in_progress = MIN(num_query, ctx->config->max_db_conn_num);
|
|
|
+ const size_t sz = base_size + num_query_in_progress * sizeof(query_param_t);
|
|
|
+
|
|
|
+ multiple_query_ctx_t * const query_ctx = h2o_mem_alloc_shared(&req->pool,
|
|
|
+ sz,
|
|
|
+ cleanup_multiple_query);
|
|
|
+
|
|
|
+ if (query_ctx) {
|
|
|
+ memset(query_ctx, 0, sz);
|
|
|
+ query_ctx->num_query = num_query;
|
|
|
+ query_ctx->num_query_in_progress = num_query_in_progress;
|
|
|
+ query_ctx->req = req;
|
|
|
+ query_ctx->query_param = (query_param_t *) ((char *) query_ctx + base_size);
|
|
|
+ initialize_ids(num_query, query_ctx->res, &ctx->random_seed);
|
|
|
+
|
|
|
+ for (size_t i = 0; i < query_ctx->num_query_in_progress; i++) {
|
|
|
+ query_ctx->query_param[i].ctx = query_ctx;
|
|
|
+ // We need a copy of id because the original may be overwritten
|
|
|
+ // by a completed query.
|
|
|
+ query_ctx->query_param[i].id = htonl(query_ctx->res[i].id);
|
|
|
+ query_ctx->query_param[i].id_format = 1;
|
|
|
+ query_ctx->query_param[i].id_len = sizeof(query_ctx->query_param[i].id);
|
|
|
+ query_ctx->query_param[i].id_pointer =
|
|
|
+ (const char *) &query_ctx->query_param[i].id;
|
|
|
+ query_ctx->query_param[i].param.command = WORLD_TABLE_NAME;
|
|
|
+ query_ctx->query_param[i].param.nParams = 1;
|
|
|
+ query_ctx->query_param[i].param.on_error = on_multiple_query_error;
|
|
|
+ query_ctx->query_param[i].param.on_result = on_multiple_query_result;
|
|
|
+ query_ctx->query_param[i].param.on_timeout = on_multiple_query_timeout;
|
|
|
+ query_ctx->query_param[i].param.paramFormats =
|
|
|
+ &query_ctx->query_param[i].id_format;
|
|
|
+ query_ctx->query_param[i].param.paramLengths =
|
|
|
+ &query_ctx->query_param[i].id_len;
|
|
|
+ query_ctx->query_param[i].param.paramValues =
|
|
|
+ &query_ctx->query_param[i].id_pointer;
|
|
|
+ query_ctx->query_param[i].param.flags = IS_PREPARED;
|
|
|
+ query_ctx->query_param[i].param.resultFormat = 1;
|
|
|
+
|
|
|
+ if (execute_query(ctx, &query_ctx->query_param[i].param)) {
|
|
|
+ query_ctx->num_query_in_progress = i;
|
|
|
+ send_service_unavailable_error(DB_REQ_ERROR, req);
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ h2o_mem_addref_shared(query_ctx);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create a JSON generator while the queries are processed.
|
|
|
+ query_ctx->gen = get_json_generator(&req->pool);
|
|
|
+
|
|
|
+ if (!query_ctx->gen)
|
|
|
+ send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
|
|
|
+
|
|
|
+ return 0;
|
|
|
}
|
|
|
|
|
|
int single_query(struct st_h2o_handler_t *self, h2o_req_t *req)
|
|
@@ -460,8 +564,22 @@ int single_query(struct st_h2o_handler_t *self, h2o_req_t *req)
|
|
|
single_query_ctx_t * const query_ctx = h2o_mem_alloc_pool(&req->pool, sizeof(*query_ctx));
|
|
|
|
|
|
if (query_ctx) {
|
|
|
- initialize_single_query_context(req, on_single_query_result, query_ctx);
|
|
|
+ memset(query_ctx, 0, sizeof(*query_ctx));
|
|
|
query_ctx->id = htonl(get_random_number(MAX_ID, &ctx->random_seed) + 1);
|
|
|
+ query_ctx->id_format = 1;
|
|
|
+ query_ctx->id_len = sizeof(query_ctx->id);
|
|
|
+ query_ctx->id_pointer = (const char *) &query_ctx->id;
|
|
|
+ query_ctx->param.command = WORLD_TABLE_NAME;
|
|
|
+ query_ctx->param.nParams = 1;
|
|
|
+ query_ctx->param.on_error = on_single_query_error;
|
|
|
+ query_ctx->param.on_result = on_single_query_result;
|
|
|
+ query_ctx->param.on_timeout = on_single_query_timeout;
|
|
|
+ query_ctx->param.paramFormats = &query_ctx->id_format;
|
|
|
+ query_ctx->param.paramLengths = &query_ctx->id_len;
|
|
|
+ query_ctx->param.paramValues = &query_ctx->id_pointer;
|
|
|
+ query_ctx->param.flags = IS_PREPARED;
|
|
|
+ query_ctx->param.resultFormat = 1;
|
|
|
+ query_ctx->req = req;
|
|
|
|
|
|
if (execute_query(ctx, &query_ctx->param))
|
|
|
send_service_unavailable_error(DB_REQ_ERROR, req);
|
|
@@ -476,5 +594,67 @@ int updates(struct st_h2o_handler_t *self, h2o_req_t *req)
|
|
|
{
|
|
|
IGNORE_FUNCTION_PARAMETER(self);
|
|
|
|
|
|
- return do_multiple_queries(CREATE, req);
|
|
|
+ thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
|
|
|
+ event_loop.h2o_ctx,
|
|
|
+ req->conn->ctx);
|
|
|
+
|
|
|
+ const size_t num_query = get_query_number(req);
|
|
|
+ size_t base_size = offsetof(update_ctx_t, res) + num_query * sizeof(query_result_t);
|
|
|
+
|
|
|
+ base_size = ((base_size + _Alignof(update_param_t) - 1) / _Alignof(update_param_t));
|
|
|
+ base_size = base_size * _Alignof(update_param_t);
|
|
|
+
|
|
|
+ const size_t num_query_in_progress = MIN(num_query, ctx->config->max_db_conn_num);
|
|
|
+ const size_t struct_size = base_size + num_query_in_progress * sizeof(update_param_t);
|
|
|
+ const size_t sz = struct_size + num_query_in_progress * MAX_UPDATE_QUERY_LEN;
|
|
|
+ update_ctx_t * const update_ctx = h2o_mem_alloc_shared(&req->pool, sz, cleanup_update);
|
|
|
+
|
|
|
+ if (update_ctx) {
|
|
|
+ memset(update_ctx, 0, struct_size);
|
|
|
+ update_ctx->num_query = num_query;
|
|
|
+ update_ctx->num_query_in_progress = num_query_in_progress;
|
|
|
+ update_ctx->req = req;
|
|
|
+ update_ctx->update_param = (update_param_t *) ((char *) update_ctx + base_size);
|
|
|
+ initialize_ids(num_query, update_ctx->res, &ctx->random_seed);
|
|
|
+
|
|
|
+ char *command = (char *) update_ctx + struct_size;
|
|
|
+
|
|
|
+ for (size_t i = 0; i < update_ctx->num_query_in_progress; i++) {
|
|
|
+ update_ctx->update_param[i].command = command;
|
|
|
+ command += MAX_UPDATE_QUERY_LEN;
|
|
|
+ update_ctx->update_param[i].ctx = update_ctx;
|
|
|
+ update_ctx->update_param[i].param.command =
|
|
|
+ update_ctx->update_param[i].command;
|
|
|
+ update_ctx->update_param[i].param.on_error = on_update_error;
|
|
|
+ update_ctx->update_param[i].param.on_result = on_update_result;
|
|
|
+ update_ctx->update_param[i].param.on_timeout = on_update_timeout;
|
|
|
+ update_ctx->update_param[i].param.resultFormat = 1;
|
|
|
+ update_ctx->update_param[i].random_number =
|
|
|
+ get_random_number(MAX_ID, &ctx->random_seed) + 1;
|
|
|
+ snprintf(update_ctx->update_param[i].command,
|
|
|
+ MAX_UPDATE_QUERY_LEN,
|
|
|
+ UPDATE_QUERY,
|
|
|
+ update_ctx->res[i].id,
|
|
|
+ update_ctx->res[i].id,
|
|
|
+ update_ctx->update_param[i].random_number);
|
|
|
+
|
|
|
+ if (execute_query(ctx, &update_ctx->update_param[i].param)) {
|
|
|
+ update_ctx->num_query_in_progress = i;
|
|
|
+ send_service_unavailable_error(DB_REQ_ERROR, req);
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ h2o_mem_addref_shared(update_ctx);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create a JSON generator while the queries are processed.
|
|
|
+ update_ctx->gen = get_json_generator(&req->pool);
|
|
|
+
|
|
|
+ if (!update_ctx->gen)
|
|
|
+ send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
|
|
|
+
|
|
|
+ return 0;
|
|
|
}
|