Browse Source

Merge pull request #2406 from volyrique/master

H2O: Optimize the multiple queries and database updates tests
knewmanTE 8 years ago
parent
commit
2ed50199de

+ 4 - 2
frameworks/C/h2o/src/database.c

@@ -66,8 +66,10 @@ static const struct {
 } prepared_statement[] = {
 } prepared_statement[] = {
 	{FORTUNE_TABLE_NAME, "SELECT * FROM " FORTUNE_TABLE_NAME ";"},
 	{FORTUNE_TABLE_NAME, "SELECT * FROM " FORTUNE_TABLE_NAME ";"},
 	{WORLD_TABLE_NAME,
 	{WORLD_TABLE_NAME,
-	 "SELECT * FROM " WORLD_TABLE_NAME " "
-	 "WHERE " WORLD_TABLE_NAME "." ID_FIELD_NAME " = $1::integer;"},
+	 "SELECT * FROM " WORLD_TABLE_NAME " WHERE " ID_FIELD_NAME " = $1::integer;"},
+	{UPDATE_QUERY_NAME,
+	 "UPDATE " WORLD_TABLE_NAME " SET randomNumber = $2::integer "
+	 "WHERE " ID_FIELD_NAME " = $1::integer;"},
 };
 };
 
 
 static void do_execute_query(db_conn_t *db_conn)
 static void do_execute_query(db_conn_t *db_conn)

+ 8 - 5
frameworks/C/h2o/src/database.h

@@ -22,12 +22,13 @@
 #define DATABASE_H_
 #define DATABASE_H_
 
 
 #include <h2o.h>
 #include <h2o.h>
+#include <inttypes.h>
 #include <stdint.h>
 #include <stdint.h>
 #include <postgresql/libpq-fe.h>
 #include <postgresql/libpq-fe.h>
 
 
 #include "list.h"
 #include "list.h"
+#include "utility.h"
 
 
-#define COPY_HEADER "PGCOPY\n\377\r\n\0\0\0\0\0\0\0\0\0"
 #define DB_REQ_ERROR "too many concurrent database requests\n"
 #define DB_REQ_ERROR "too many concurrent database requests\n"
 #define DB_TIMEOUT_ERROR "database timeout\n"
 #define DB_TIMEOUT_ERROR "database timeout\n"
 #define FORTUNE_TABLE_NAME "Fortune"
 #define FORTUNE_TABLE_NAME "Fortune"
@@ -36,13 +37,15 @@
 #define IS_SINGLE_ROW 2
 #define IS_SINGLE_ROW 2
 #define MAX_ID 10000
 #define MAX_ID 10000
 #define MESSAGE_FIELD_NAME "message"
 #define MESSAGE_FIELD_NAME "message"
+#define UPDATE_QUERY_NAME "Update"
 #define WORLD_TABLE_NAME "World"
 #define WORLD_TABLE_NAME "World"
 
 
 #define UPDATE_QUERY \
 #define UPDATE_QUERY \
-	"CREATE TEMP TABLE input(LIKE World INCLUDING ALL) ON COMMIT DROP;" \
-	"COPY input FROM STDIN WITH (FORMAT binary);" \
-	"UPDATE World SET randomNumber = input.randomNumber FROM input " \
-	"WHERE World." ID_FIELD_NAME " = input." ID_FIELD_NAME ";"
+	"EXECUTE \"" WORLD_TABLE_NAME "\"(%" PRIu32 ");" \
+	"EXECUTE \"" UPDATE_QUERY_NAME "\"(%" PRIu32 ", %" PRIu32 ");"
+
+#define MAX_UPDATE_QUERY_LEN \
+	(sizeof(UPDATE_QUERY) + 3 * (sizeof(MKSTR(MAX_ID)) - 1) - 3 * sizeof(PRIu32))
 
 
 typedef enum {
 typedef enum {
 	SUCCESS,
 	SUCCESS,

+ 2 - 1
frameworks/C/h2o/src/fortune.c

@@ -102,6 +102,7 @@ static uintmax_t add_iovec(mustache_api_t *api,
 	}
 	}
 
 
 	if (ret) {
 	if (ret) {
+		memset(iovec_list->iov + iovec_list->iovcnt, 0, sizeof(*iovec_list->iov));
 		iovec_list->iov[iovec_list->iovcnt].base = (char *) buffer;
 		iovec_list->iov[iovec_list->iovcnt].base = (char *) buffer;
 		iovec_list->iov[iovec_list->iovcnt++].len = buffer_size;
 		iovec_list->iov[iovec_list->iovcnt++].len = buffer_size;
 		fortune_ctx->content_length += buffer_size;
 		fortune_ctx->content_length += buffer_size;
@@ -263,7 +264,7 @@ static result_return_t on_fortune_result(db_query_param_t *param, PGresult *resu
 		const size_t iovcnt = MIN(MAX_IOVEC, fortune_ctx->num_result * 5 + 2);
 		const size_t iovcnt = MIN(MAX_IOVEC, fortune_ctx->num_result * 5 + 2);
 		const size_t sz = offsetof(iovec_list_t, iov) + iovcnt * sizeof(h2o_iovec_t);
 		const size_t sz = offsetof(iovec_list_t, iov) + iovcnt * sizeof(h2o_iovec_t);
 		char _Alignas(iovec_list_t) mem[sz];
 		char _Alignas(iovec_list_t) mem[sz];
-		iovec_list_t * const iovec_list = (iovec_list_t *) mem;
+		iovec_list_t * const restrict iovec_list = (iovec_list_t *) mem;
 
 
 		memset(iovec_list, 0, offsetof(iovec_list_t, iov));
 		memset(iovec_list, 0, offsetof(iovec_list_t, iov));
 		iovec_list->max_iovcnt = iovcnt;
 		iovec_list->max_iovcnt = iovcnt;

+ 419 - 239
frameworks/C/h2o/src/world.c

@@ -19,8 +19,10 @@
 
 
 #include <assert.h>
 #include <assert.h>
 #include <h2o.h>
 #include <h2o.h>
+#include <stdbool.h>
 #include <stddef.h>
 #include <stddef.h>
 #include <stdint.h>
 #include <stdint.h>
+#include <stdio.h>
 #include <arpa/inet.h>
 #include <arpa/inet.h>
 #include <postgresql/libpq-fe.h>
 #include <postgresql/libpq-fe.h>
 #include <yajl/yajl_gen.h>
 #include <yajl/yajl_gen.h>
@@ -38,13 +40,8 @@
 #define QUERIES_PARAMETER "queries="
 #define QUERIES_PARAMETER "queries="
 #define RANDOM_NUM_KEY "randomNumber"
 #define RANDOM_NUM_KEY "randomNumber"
 
 
-typedef enum {
-	NO_UPDATE = 0,
-	CREATE,
-	COPY_1,
-	COPY_2,
-	UPDATE
-} update_state_t;
+typedef struct multiple_query_ctx_t multiple_query_ctx_t;
+typedef struct update_ctx_t update_ctx_t;
 
 
 typedef struct {
 typedef struct {
 	uint32_t id;
 	uint32_t id;
@@ -52,41 +49,72 @@ typedef struct {
 } query_result_t;
 } query_result_t;
 
 
 typedef struct {
 typedef struct {
+	multiple_query_ctx_t *ctx;
+	const char *id_pointer;
+	uint32_t id;
+	int id_format;
+	int id_len;
 	db_query_param_t param;
 	db_query_param_t param;
+} query_param_t;
+
+typedef struct {
 	const char *id_pointer;
 	const char *id_pointer;
 	h2o_req_t *req;
 	h2o_req_t *req;
 	uint32_t id;
 	uint32_t id;
 	int id_format;
 	int id_format;
 	int id_len;
 	int id_len;
+	db_query_param_t param;
 } single_query_ctx_t;
 } single_query_ctx_t;
 
 
+struct multiple_query_ctx_t {
+	yajl_gen gen; // also an error flag
+	h2o_req_t *req;
+	query_param_t *query_param;
+	size_t num_query;
+	size_t num_query_in_progress;
+	size_t num_result;
+	query_result_t res[];
+};
+
 typedef struct {
 typedef struct {
-	single_query_ctx_t single;
-	yajl_gen gen;
+	char *command;
+	update_ctx_t *ctx;
+	uint32_t random_number;
+	bool update;
+	db_query_param_t param;
+} update_param_t;
+
+struct update_ctx_t {
+	yajl_gen gen; // also an error flag
+	h2o_req_t *req;
+	update_param_t *update_param;
 	size_t num_query;
 	size_t num_query;
+	size_t num_query_in_progress;
 	size_t num_result;
 	size_t num_result;
-	update_state_t update_state;
 	query_result_t res[];
 	query_result_t res[];
-} multiple_query_ctx_t;
-
-static void cleanup_request(void *data);
-static int do_multiple_queries(update_state_t update_state, h2o_req_t *req);
-static void initialize_single_query_context(h2o_req_t *req,
-                                            on_result_t on_result,
-                                            single_query_ctx_t *query_ctx);
-static void on_database_error(db_query_param_t *param, const char *error_string);
-static void on_database_timeout(db_query_param_t *param);
+};
+
+static void cleanup_multiple_query(void *data);
+static void cleanup_update(void *data);
+static size_t get_query_number(h2o_req_t *req);
+static void initialize_ids(size_t num_query, query_result_t *res, unsigned int *seed);
+static void on_multiple_query_error(db_query_param_t *param, const char *error_string);
 static result_return_t on_multiple_query_result(db_query_param_t *param, PGresult *result);
 static result_return_t on_multiple_query_result(db_query_param_t *param, PGresult *result);
+static void on_multiple_query_timeout(db_query_param_t *param);
+static void on_single_query_error(db_query_param_t *param, const char *error_string);
 static result_return_t on_single_query_result(db_query_param_t *param, PGresult *result);
 static result_return_t on_single_query_result(db_query_param_t *param, PGresult *result);
+static void on_single_query_timeout(db_query_param_t *param);
+static void on_update_error(db_query_param_t *param, const char *error_string);
 static result_return_t on_update_result(db_query_param_t *param, PGresult *result);
 static result_return_t on_update_result(db_query_param_t *param, PGresult *result);
-static int on_update_write_ready(db_query_param_t *param, PGconn *db_conn);
+static void on_update_timeout(db_query_param_t *param);
+static void process_result(PGresult *result, query_result_t *out);
 static int serialize_item(uint32_t id, uint32_t random_number, yajl_gen gen);
 static int serialize_item(uint32_t id, uint32_t random_number, yajl_gen gen);
 static void serialize_items(const query_result_t *res,
 static void serialize_items(const query_result_t *res,
                             size_t num_result,
                             size_t num_result,
                             yajl_gen gen,
                             yajl_gen gen,
                             h2o_req_t *req);
                             h2o_req_t *req);
 
 
-static void cleanup_request(void *data)
+static void cleanup_multiple_query(void *data)
 {
 {
 	const multiple_query_ctx_t * const query_ctx = data;
 	const multiple_query_ctx_t * const query_ctx = data;
 
 
@@ -94,11 +122,16 @@ static void cleanup_request(void *data)
 		yajl_gen_free(query_ctx->gen);
 		yajl_gen_free(query_ctx->gen);
 }
 }
 
 
-static int do_multiple_queries(update_state_t update_state, h2o_req_t *req)
+static void cleanup_update(void *data)
+{
+	const update_ctx_t * const update_ctx = data;
+
+	if (update_ctx->gen)
+		yajl_gen_free(update_ctx->gen);
+}
+
+static size_t get_query_number(h2o_req_t *req)
 {
 {
-	thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
-	                                                      event_loop.h2o_ctx,
-	                                                      req->conn->ctx);
 	int num_query = 0;
 	int num_query = 0;
 
 
 	if (req->query_at < SIZE_MAX) {
 	if (req->query_at < SIZE_MAX) {
@@ -116,144 +149,118 @@ static int do_multiple_queries(update_state_t update_state, h2o_req_t *req)
 	else if (num_query > MAX_QUERIES)
 	else if (num_query > MAX_QUERIES)
 		num_query = MAX_QUERIES;
 		num_query = MAX_QUERIES;
 
 
-	const size_t sz = offsetof(multiple_query_ctx_t, res) + num_query * sizeof(query_result_t);
-	multiple_query_ctx_t * const query_ctx = h2o_mem_alloc_shared(&req->pool, sz, cleanup_request);
-
-	if (query_ctx) {
-		// MAX_ID is a relatively small number, so allocate on the stack.
-		DEFINE_BITSET(bitset, MAX_ID);
-
-		initialize_single_query_context(req, on_multiple_query_result, &query_ctx->single);
-		memset(&query_ctx->single + 1,
-		       0,
-		       offsetof(multiple_query_ctx_t, res) - sizeof(query_ctx->single));
-		query_ctx->num_query = num_query;
-		query_ctx->update_state = update_state;
-
-		size_t max_rand = MAX_ID - query_ctx->num_query + 1;
+	return num_query;
+}
 
 
-		for (size_t i = 0; i < query_ctx->num_query; i++) {
-			query_ctx->res[i].id = get_random_number(max_rand, &ctx->random_seed);
+static void initialize_ids(size_t num_query, query_result_t *res, unsigned int *seed)
+{
+	// MAX_ID is a relatively small number, so allocate on the stack.
+	DEFINE_BITSET(bitset, MAX_ID);
 
 
-			if (BITSET_ISSET(query_ctx->res[i].id, bitset))
-				query_ctx->res[i].id = max_rand - 1;
+	size_t max_rand = MAX_ID - num_query + 1;
 
 
-			BITSET_SET(query_ctx->res[i].id++, bitset);
-			max_rand++;
-		}
+	for (size_t i = 0; i < num_query; i++) {
+		res[i].id = get_random_number(max_rand, seed);
 
 
-		query_ctx->single.id = htonl(query_ctx->res->id);
+		if (BITSET_ISSET(res[i].id, bitset))
+			res[i].id = max_rand - 1;
 
 
-		if (execute_query(ctx, &query_ctx->single.param))
-			send_service_unavailable_error(DB_REQ_ERROR, req);
-		else
-			// Create a JSON generator while the query is processed.
-			query_ctx->gen = get_json_generator(&req->pool);
+		BITSET_SET(res[i].id++, bitset);
+		max_rand++;
 	}
 	}
-	else
-		send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
-
-	return 0;
 }
 }
 
 
-static void initialize_single_query_context(h2o_req_t *req,
-                                            on_result_t on_result,
-                                            single_query_ctx_t *query_ctx)
+static void on_multiple_query_error(db_query_param_t *param, const char *error_string)
 {
 {
-	memset(query_ctx, 0, sizeof(*query_ctx));
-	query_ctx->id_format = 1;
-	query_ctx->id_len = sizeof(query_ctx->id);
-	query_ctx->id_pointer = (const char *) &query_ctx->id;
-	query_ctx->param.command = WORLD_TABLE_NAME;
-	query_ctx->param.nParams = 1;
-	query_ctx->param.on_error = on_database_error;
-	query_ctx->param.on_result = on_result;
-	query_ctx->param.on_timeout = on_database_timeout;
-	query_ctx->param.paramFormats = &query_ctx->id_format;
-	query_ctx->param.paramLengths = &query_ctx->id_len;
-	query_ctx->param.paramValues = &query_ctx->id_pointer;
-	query_ctx->param.flags = IS_PREPARED;
-	query_ctx->param.resultFormat = 1;
-	query_ctx->req = req;
-}
+	const query_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(query_param_t,
+	                                                                 param,
+	                                                                 param);
+	multiple_query_ctx_t * const query_ctx = query_param->ctx;
 
 
-static void on_database_error(db_query_param_t *param, const char *error_string)
-{
-	single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t,
-	                                                              param,
-	                                                              param);
-
-	send_error(BAD_GATEWAY, error_string, query_ctx->req);
-}
-
-static void on_database_timeout(db_query_param_t *param)
-{
-	single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t,
-	                                                              param,
-	                                                              param);
+	if (query_ctx->gen) {
+		yajl_gen_free(query_ctx->gen);
+		query_ctx->gen = NULL;
+		send_error(BAD_GATEWAY, error_string, query_ctx->req);
+	}
 
 
-	send_error(GATEWAY_TIMEOUT, DB_TIMEOUT_ERROR, query_ctx->req);
+	query_ctx->num_query_in_progress--;
+	h2o_mem_release_shared(query_ctx);
 }
 }
 
 
 static result_return_t on_multiple_query_result(db_query_param_t *param, PGresult *result)
 static result_return_t on_multiple_query_result(db_query_param_t *param, PGresult *result)
 {
 {
-	multiple_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(multiple_query_ctx_t,
-	                                                                single.param,
-	                                                                param);
+	query_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(query_param_t,
+	                                                           param,
+	                                                           param);
+	multiple_query_ctx_t * const query_ctx = query_param->ctx;
 
 
-	if (PQresultStatus(result) == PGRES_TUPLES_OK) {
+	if (query_ctx->gen && PQresultStatus(result) == PGRES_TUPLES_OK) {
 		thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
 		thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
 		                                                      event_loop.h2o_ctx,
 		                                                      event_loop.h2o_ctx,
-		                                                      query_ctx->single.req->conn->ctx);
-		uint32_t * const random_number = &query_ctx->res[query_ctx->num_result++].random_number;
+		                                                      query_ctx->req->conn->ctx);
 
 
-		assert(PQnfields(result) == 2);
-		assert(PQntuples(result) == 1);
-		assert(PQgetlength(result, 0, 1) == sizeof(*random_number));
-		// Use memcpy() in case the result is not aligned.
-		memcpy(random_number, PQgetvalue(result, 0, 1), sizeof(*random_number));
-		*random_number = ntohl(*random_number);
-		PQclear(result);
+		process_result(result, query_ctx->res + query_ctx->num_result);
+		query_ctx->num_query_in_progress--;
+		query_ctx->num_result++;
+
+		const size_t num_query_remaining = query_ctx->num_query - query_ctx->num_result;
+
+		if (query_ctx->num_query_in_progress < num_query_remaining) {
+			const size_t idx = query_ctx->num_result + query_ctx->num_query_in_progress;
 
 
-		if (query_ctx->num_result < query_ctx->num_query) {
-			query_ctx->single.id = htonl(query_ctx->res[query_ctx->num_result].id);
+			query_param->id = htonl(query_ctx->res[idx].id);
 
 
-			if (!execute_query(ctx, &query_ctx->single.param))
+			if (!execute_query(ctx, &query_param->param)) {
+				query_ctx->num_query_in_progress++;
+				PQclear(result);
 				return DONE;
 				return DONE;
+			}
 
 
-			send_service_unavailable_error(DB_REQ_ERROR, query_ctx->single.req);
+			yajl_gen_free(query_ctx->gen);
+			query_ctx->gen = NULL;
+			send_service_unavailable_error(DB_REQ_ERROR, query_ctx->req);
 		}
 		}
-		else if (query_ctx->update_state == NO_UPDATE) {
+		else if (query_ctx->num_result == query_ctx->num_query)
 			serialize_items(query_ctx->res,
 			serialize_items(query_ctx->res,
 			                query_ctx->num_result,
 			                query_ctx->num_result,
 			                query_ctx->gen,
 			                query_ctx->gen,
-			                query_ctx->single.req);
-			return DONE;
-		}
-		else {
-			query_ctx->single.param.command = UPDATE_QUERY;
-			query_ctx->single.param.nParams = 0;
-			query_ctx->single.param.on_result = on_update_result;
-			query_ctx->single.param.on_write_ready = on_update_write_ready;
-			query_ctx->single.param.paramFormats = NULL;
-			query_ctx->single.param.paramLengths = NULL;
-			query_ctx->single.param.paramValues = NULL;
-			query_ctx->single.param.flags = 0;
-
-			if (!execute_query(ctx, &query_ctx->single.param))
-				return DONE;
+			                query_ctx->req);
 
 
-			send_service_unavailable_error(DB_REQ_ERROR, query_ctx->single.req);
-		}
-	}
-	else {
-		send_error(BAD_GATEWAY, PQresultErrorMessage(result), query_ctx->single.req);
-		PQclear(result);
+		h2o_mem_release_shared(query_ctx);
 	}
 	}
+	else
+		on_multiple_query_error(param, PQresultErrorMessage(result));
 
 
+	PQclear(result);
 	return DONE;
 	return DONE;
 }
 }
 
 
+static void on_multiple_query_timeout(db_query_param_t *param)
+{
+	const query_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(query_param_t,
+	                                                                 param,
+	                                                                 param);
+	multiple_query_ctx_t * const query_ctx = query_param->ctx;
+
+	if (query_ctx->gen) {
+		yajl_gen_free(query_ctx->gen);
+		query_ctx->gen = NULL;
+		send_error(GATEWAY_TIMEOUT, DB_TIMEOUT_ERROR, query_ctx->req);
+	}
+
+	query_ctx->num_query_in_progress--;
+	h2o_mem_release_shared(query_ctx);
+}
+
+static void on_single_query_error(db_query_param_t *param, const char *error_string)
+{
+	single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t,
+	                                                              param,
+	                                                              param);
+
+	send_error(BAD_GATEWAY, error_string, query_ctx->req);
+}
+
 static result_return_t on_single_query_result(db_query_param_t *param, PGresult *result)
 static result_return_t on_single_query_result(db_query_param_t *param, PGresult *result)
 {
 {
 	single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t,
 	single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t,
@@ -294,114 +301,151 @@ static result_return_t on_single_query_result(db_query_param_t *param, PGresult
 	return DONE;
 	return DONE;
 }
 }
 
 
-static result_return_t on_update_result(db_query_param_t *param, PGresult *result)
+static void on_single_query_timeout(db_query_param_t *param)
 {
 {
-	multiple_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(multiple_query_ctx_t,
-	                                                                single.param,
-	                                                                param);
-	result_return_t ret = SUCCESS;
-	const ExecStatusType status = PQresultStatus(result);
-
-	switch (query_ctx->update_state) {
-		case CREATE:
-		case COPY_2:
-			if (status != PGRES_COMMAND_OK)
-				goto error;
-
-			query_ctx->update_state++;
-			break;
-		case COPY_1:
-			if (status != PGRES_COPY_IN)
-				goto error;
-
-			ret = WANT_WRITE;
-			break;
-		case UPDATE:
-			if (status != PGRES_COMMAND_OK)
-				goto error;
-
-			serialize_items(query_ctx->res,
-			                query_ctx->num_result,
-			                query_ctx->gen,
-			                query_ctx->single.req);
-			ret = DONE;
-			break;
-		default:
-			goto error;
-	}
+	single_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(single_query_ctx_t,
+	                                                              param,
+	                                                              param);
 
 
-	PQclear(result);
-	return ret;
-error:
-	send_error(BAD_GATEWAY, PQresultErrorMessage(result), query_ctx->single.req);
-	PQclear(result);
-	return DONE;
+	send_error(GATEWAY_TIMEOUT, DB_TIMEOUT_ERROR, query_ctx->req);
 }
 }
 
 
-static int on_update_write_ready(db_query_param_t *param, PGconn *db_conn)
+static void on_update_error(db_query_param_t *param, const char *error_string)
 {
 {
-	multiple_query_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(multiple_query_ctx_t,
-	                                                                single.param,
-	                                                                param);
-	thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
-	                                                      event_loop.h2o_ctx,
-	                                                      query_ctx->single.req->conn->ctx);
-
-	if (query_ctx->num_result == query_ctx->num_query && query_ctx->update_state != COPY_2) {
-		const int rc = PQputCopyData(db_conn, H2O_STRLIT(COPY_HEADER));
-
-		if (!rc)
-			return 1;
-		else if (rc < 0)
-			return rc;
-
-		query_ctx->num_result = 0;
-		query_ctx->update_state = COPY_2;
+	const update_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(update_param_t,
+	                                                                  param,
+	                                                                  param);
+	update_ctx_t * const update_ctx = query_param->ctx;
+
+	if (update_ctx->gen) {
+		yajl_gen_free(update_ctx->gen);
+		update_ctx->gen = NULL;
+		send_error(BAD_GATEWAY, error_string, update_ctx->req);
 	}
 	}
 
 
-	if (query_ctx->num_result < query_ctx->num_query) {
-		assert(query_ctx->num_query - query_ctx->num_result <= MAX_QUERIES);
-
-		// There are at most MAX_QUERIES elements, so allocate on the stack.
-		struct __attribute__ ((__packed__)) {
-			uint16_t field_count;
-			uint32_t id_size;
-			uint32_t id;
-			uint32_t random_number_size;
-			uint32_t random_number;
-		} data[query_ctx->num_query - query_ctx->num_result];
-
-		memset(&data, 0, sizeof(data));
-
-		for (size_t i = 0; i < query_ctx->num_query; i++) {
-			query_ctx->res[i].random_number = get_random_number(MAX_ID, &ctx->random_seed) + 1;
-			data[i].field_count = htons(2);
-			data[i].id_size = htonl(sizeof(data->id));
-			data[i].id = htonl(query_ctx->res[i].id);
-			data[i].random_number_size = htonl(sizeof(data->random_number));
-			data[i].random_number = htonl(query_ctx->res[i].random_number);
-		}
+	update_ctx->num_query_in_progress--;
+	h2o_mem_release_shared(update_ctx);
+}
 
 
-		const int rc = PQputCopyData(db_conn, (const char *) &data, sizeof(data));
+static result_return_t on_update_result(db_query_param_t *param, PGresult *result)
+{
+	update_param_t * const update_param = H2O_STRUCT_FROM_MEMBER(update_param_t,
+	                                                             param,
+	                                                             param);
+	update_ctx_t * const update_ctx = update_param->ctx;
+	result_return_t ret = DONE;
+
+	if (update_ctx->gen) {
+		if (update_param->update) {
+			if (PQresultStatus(result) == PGRES_COMMAND_OK) {
+				thread_context_t * const ctx =
+					H2O_STRUCT_FROM_MEMBER(thread_context_t,
+					                       event_loop.h2o_ctx,
+					                       update_ctx->req->conn->ctx);
+				const size_t num_query_remaining =
+					update_ctx->num_query - update_ctx->num_result;
+
+				update_ctx->num_query_in_progress--;
+
+				if (update_ctx->num_query_in_progress < num_query_remaining) {
+					const size_t idx = update_ctx->num_result +
+					                   update_ctx->num_query_in_progress;
+
+					update_param->random_number =
+						get_random_number(MAX_ID, &ctx->random_seed) + 1;
+					update_param->update = false;
+					snprintf(update_param->command,
+					         MAX_UPDATE_QUERY_LEN,
+					         UPDATE_QUERY,
+					         update_ctx->res[idx].id,
+					         update_ctx->res[idx].id,
+					         update_param->random_number);
+
+					if (!execute_query(ctx, &update_param->param)) {
+						update_ctx->num_query_in_progress++;
+						PQclear(result);
+						return DONE;
+					}
+
+					yajl_gen_free(update_ctx->gen);
+					update_ctx->gen = NULL;
+					send_service_unavailable_error(DB_REQ_ERROR,
+					                               update_ctx->req);
+				}
+				else if (update_ctx->num_result == update_ctx->num_query)
+					serialize_items(update_ctx->res,
+					                update_ctx->num_result,
+					                update_ctx->gen,
+					                update_ctx->req);
+
+				h2o_mem_release_shared(update_ctx);
+			}
+			else
+				on_update_error(param, PQresultErrorMessage(result));
+		}
+		else if (PQresultStatus(result) == PGRES_TUPLES_OK) {
+			process_result(result, update_ctx->res + update_ctx->num_result);
+			update_ctx->res[update_ctx->num_result].random_number =
+				update_param->random_number;
+			update_ctx->num_result++;
+			update_param->update = true;
+			ret = SUCCESS;
+		}
+		else
+			on_update_error(param, PQresultErrorMessage(result));
+	}
+	else {
+		update_ctx->num_query_in_progress--;
+		h2o_mem_release_shared(update_ctx);
+	}
 
 
-		if (!rc)
-			return 1;
-		else if (rc < 0)
-			return rc;
+	PQclear(result);
+	return ret;
+}
 
 
-		query_ctx->num_result = query_ctx->num_query;
+static void on_update_timeout(db_query_param_t *param)
+{
+	const update_param_t * const query_param = H2O_STRUCT_FROM_MEMBER(update_param_t,
+	                                                                  param,
+	                                                                  param);
+	update_ctx_t * const update_ctx = query_param->ctx;
+
+	if (update_ctx->gen) {
+		yajl_gen_free(update_ctx->gen);
+		update_ctx->gen = NULL;
+		send_error(GATEWAY_TIMEOUT, DB_TIMEOUT_ERROR, update_ctx->req);
 	}
 	}
 
 
-	if (query_ctx->num_result == query_ctx->num_query) {
-		const int rc = PQputCopyEnd(db_conn, NULL);
+	update_ctx->num_query_in_progress--;
+	h2o_mem_release_shared(update_ctx);
+}
 
 
-		if (!rc)
-			return 1;
-		else if (rc < 0)
-			return rc;
+static void process_result(PGresult *result, query_result_t *out)
+{
+	assert(PQnfields(result) == 2);
+	assert(PQntuples(result) == 1);
+
+	const char * const id = PQgetvalue(result, 0, 0);
+	const char * const random_number = PQgetvalue(result, 0, 1);
+
+	if (PQfformat(result, 0)) {
+		assert(PQgetlength(result, 0, 0) == sizeof(out->id));
+		// Use memcpy() in case the result is not aligned; the reason we are
+		// copying over the id is because the results may arrive in any order.
+		memcpy(&out->id, id, sizeof(out->id));
+		out->id = ntohl(out->id);
 	}
 	}
+	else
+		out->id = atoi(id);
 
 
-	return PQflush(db_conn);
+	if (PQfformat(result, 1)) {
+		assert(PQgetlength(result, 0, 1) == sizeof(out->random_number));
+		// Use memcpy() in case the result is not aligned.
+		memcpy(&out->random_number, random_number, sizeof(out->random_number));
+		out->random_number = ntohl(out->random_number);
+	}
+	else
+		out->random_number = atoi(random_number);
 }
 }
 
 
 static int serialize_item(uint32_t id, uint32_t random_number, yajl_gen gen)
 static int serialize_item(uint32_t id, uint32_t random_number, yajl_gen gen)
@@ -422,22 +466,16 @@ static void serialize_items(const query_result_t *res,
                             yajl_gen gen,
                             yajl_gen gen,
                             h2o_req_t *req)
                             h2o_req_t *req)
 {
 {
-	// In principle the JSON generator can be created here, but we do it earlier,
-	// so that it happens in parallel with the database query; we assume that the
-	// allocation will rarely fail, so that the delayed error handling is not
-	// problematic.
-	if (gen) {
-		CHECK_YAJL_STATUS(yajl_gen_array_open, gen);
+	CHECK_YAJL_STATUS(yajl_gen_array_open, gen);
 
 
-		for (size_t i = 0; i < num_result; i++)
-			if (serialize_item(res[i].id, res[i].random_number, gen))
-				goto error_yajl;
+	for (size_t i = 0; i < num_result; i++)
+		if (serialize_item(res[i].id, res[i].random_number, gen))
+			goto error_yajl;
 
 
-		CHECK_YAJL_STATUS(yajl_gen_array_close, gen);
+	CHECK_YAJL_STATUS(yajl_gen_array_close, gen);
 
 
-		if (!send_json_response(gen, false, req))
-			return;
-	}
+	if (!send_json_response(gen, false, req))
+		return;
 
 
 error_yajl:
 error_yajl:
 	send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
 	send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
@@ -447,7 +485,73 @@ int multiple_queries(struct st_h2o_handler_t *self, h2o_req_t *req)
 {
 {
 	IGNORE_FUNCTION_PARAMETER(self);
 	IGNORE_FUNCTION_PARAMETER(self);
 
 
-	return do_multiple_queries(NO_UPDATE, req);
+	thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
+	                                                      event_loop.h2o_ctx,
+	                                                      req->conn->ctx);
+
+	const size_t num_query = get_query_number(req);
+	size_t base_size = offsetof(multiple_query_ctx_t, res) + num_query * sizeof(query_result_t);
+
+	base_size = ((base_size + _Alignof(query_param_t) - 1) / _Alignof(query_param_t));
+	base_size = base_size * _Alignof(query_param_t);
+
+	const size_t num_query_in_progress = MIN(num_query, ctx->config->max_db_conn_num);
+	const size_t sz = base_size + num_query_in_progress * sizeof(query_param_t);
+
+	multiple_query_ctx_t * const query_ctx = h2o_mem_alloc_shared(&req->pool,
+	                                                              sz,
+	                                                              cleanup_multiple_query);
+
+	if (query_ctx) {
+		memset(query_ctx, 0, sz);
+		query_ctx->num_query = num_query;
+		query_ctx->num_query_in_progress = num_query_in_progress;
+		query_ctx->req = req;
+		query_ctx->query_param = (query_param_t *) ((char *) query_ctx + base_size);
+		initialize_ids(num_query, query_ctx->res, &ctx->random_seed);
+
+		for (size_t i = 0; i < query_ctx->num_query_in_progress; i++) {
+			query_ctx->query_param[i].ctx = query_ctx;
+			// We need a copy of id because the original may be overwritten
+			// by a completed query.
+			query_ctx->query_param[i].id = htonl(query_ctx->res[i].id);
+			query_ctx->query_param[i].id_format = 1;
+			query_ctx->query_param[i].id_len = sizeof(query_ctx->query_param[i].id);
+			query_ctx->query_param[i].id_pointer =
+				(const char *) &query_ctx->query_param[i].id;
+			query_ctx->query_param[i].param.command = WORLD_TABLE_NAME;
+			query_ctx->query_param[i].param.nParams = 1;
+			query_ctx->query_param[i].param.on_error = on_multiple_query_error;
+			query_ctx->query_param[i].param.on_result = on_multiple_query_result;
+			query_ctx->query_param[i].param.on_timeout = on_multiple_query_timeout;
+			query_ctx->query_param[i].param.paramFormats =
+				&query_ctx->query_param[i].id_format;
+			query_ctx->query_param[i].param.paramLengths =
+				&query_ctx->query_param[i].id_len;
+			query_ctx->query_param[i].param.paramValues =
+				&query_ctx->query_param[i].id_pointer;
+			query_ctx->query_param[i].param.flags = IS_PREPARED;
+			query_ctx->query_param[i].param.resultFormat = 1;
+
+			if (execute_query(ctx, &query_ctx->query_param[i].param)) {
+				query_ctx->num_query_in_progress = i;
+				send_service_unavailable_error(DB_REQ_ERROR, req);
+				return 0;
+			}
+
+			h2o_mem_addref_shared(query_ctx);
+		}
+
+		// Create a JSON generator while the queries are processed.
+		query_ctx->gen = get_json_generator(&req->pool);
+
+		if (!query_ctx->gen)
+			send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
+	}
+	else
+		send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
+
+	return 0;
 }
 }
 
 
 int single_query(struct st_h2o_handler_t *self, h2o_req_t *req)
 int single_query(struct st_h2o_handler_t *self, h2o_req_t *req)
@@ -460,8 +564,22 @@ int single_query(struct st_h2o_handler_t *self, h2o_req_t *req)
 	single_query_ctx_t * const query_ctx = h2o_mem_alloc_pool(&req->pool, sizeof(*query_ctx));
 	single_query_ctx_t * const query_ctx = h2o_mem_alloc_pool(&req->pool, sizeof(*query_ctx));
 
 
 	if (query_ctx) {
 	if (query_ctx) {
-		initialize_single_query_context(req, on_single_query_result, query_ctx);
+		memset(query_ctx, 0, sizeof(*query_ctx));
 		query_ctx->id = htonl(get_random_number(MAX_ID, &ctx->random_seed) + 1);
 		query_ctx->id = htonl(get_random_number(MAX_ID, &ctx->random_seed) + 1);
+		query_ctx->id_format = 1;
+		query_ctx->id_len = sizeof(query_ctx->id);
+		query_ctx->id_pointer = (const char *) &query_ctx->id;
+		query_ctx->param.command = WORLD_TABLE_NAME;
+		query_ctx->param.nParams = 1;
+		query_ctx->param.on_error = on_single_query_error;
+		query_ctx->param.on_result = on_single_query_result;
+		query_ctx->param.on_timeout = on_single_query_timeout;
+		query_ctx->param.paramFormats = &query_ctx->id_format;
+		query_ctx->param.paramLengths = &query_ctx->id_len;
+		query_ctx->param.paramValues = &query_ctx->id_pointer;
+		query_ctx->param.flags = IS_PREPARED;
+		query_ctx->param.resultFormat = 1;
+		query_ctx->req = req;
 
 
 		if (execute_query(ctx, &query_ctx->param))
 		if (execute_query(ctx, &query_ctx->param))
 			send_service_unavailable_error(DB_REQ_ERROR, req);
 			send_service_unavailable_error(DB_REQ_ERROR, req);
@@ -476,5 +594,67 @@ int updates(struct st_h2o_handler_t *self, h2o_req_t *req)
 {
 {
 	IGNORE_FUNCTION_PARAMETER(self);
 	IGNORE_FUNCTION_PARAMETER(self);
 
 
-	return do_multiple_queries(CREATE, req);
+	thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
+	                                                      event_loop.h2o_ctx,
+	                                                      req->conn->ctx);
+
+	const size_t num_query = get_query_number(req);
+	size_t base_size = offsetof(update_ctx_t, res) + num_query * sizeof(query_result_t);
+
+	base_size = ((base_size + _Alignof(update_param_t) - 1) / _Alignof(update_param_t));
+	base_size = base_size * _Alignof(update_param_t);
+
+	const size_t num_query_in_progress = MIN(num_query, ctx->config->max_db_conn_num);
+	const size_t struct_size = base_size + num_query_in_progress * sizeof(update_param_t);
+	const size_t sz = struct_size + num_query_in_progress * MAX_UPDATE_QUERY_LEN;
+	update_ctx_t * const update_ctx = h2o_mem_alloc_shared(&req->pool, sz, cleanup_update);
+
+	if (update_ctx) {
+		memset(update_ctx, 0, struct_size);
+		update_ctx->num_query = num_query;
+		update_ctx->num_query_in_progress = num_query_in_progress;
+		update_ctx->req = req;
+		update_ctx->update_param = (update_param_t *) ((char *) update_ctx + base_size);
+		initialize_ids(num_query, update_ctx->res, &ctx->random_seed);
+
+		char *command = (char *) update_ctx + struct_size;
+
+		for (size_t i = 0; i < update_ctx->num_query_in_progress; i++) {
+			update_ctx->update_param[i].command = command;
+			command += MAX_UPDATE_QUERY_LEN;
+			update_ctx->update_param[i].ctx = update_ctx;
+			update_ctx->update_param[i].param.command =
+				update_ctx->update_param[i].command;
+			update_ctx->update_param[i].param.on_error = on_update_error;
+			update_ctx->update_param[i].param.on_result = on_update_result;
+			update_ctx->update_param[i].param.on_timeout = on_update_timeout;
+			update_ctx->update_param[i].param.resultFormat = 1;
+			update_ctx->update_param[i].random_number =
+				get_random_number(MAX_ID, &ctx->random_seed) + 1;
+			snprintf(update_ctx->update_param[i].command,
+			         MAX_UPDATE_QUERY_LEN,
+			         UPDATE_QUERY,
+			         update_ctx->res[i].id,
+			         update_ctx->res[i].id,
+			         update_ctx->update_param[i].random_number);
+
+			if (execute_query(ctx, &update_ctx->update_param[i].param)) {
+				update_ctx->num_query_in_progress = i;
+				send_service_unavailable_error(DB_REQ_ERROR, req);
+				return 0;
+			}
+
+			h2o_mem_addref_shared(update_ctx);
+		}
+
+		// Create a JSON generator while the queries are processed.
+		update_ctx->gen = get_json_generator(&req->pool);
+
+		if (!update_ctx->gen)
+			send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
+	}
+	else
+		send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
+
+	return 0;
 }
 }