Browse Source

H2O: Optimize the cached queries and the database tests (#7655)

* H2O: Implement database query pipelining

* H2O: Switch to thread-local caches again

This change affects the cached queries test.
Anton Kirilov 2 years ago
parent
commit
eaef3b5039

+ 23 - 11
frameworks/C/h2o/CMakeLists.txt

@@ -1,22 +1,34 @@
-cmake_minimum_required(VERSION 3.16.0)
+cmake_minimum_required(VERSION 3.18.0)
 project(h2o_app)
-find_library(H2O_LIB h2o-evloop)
-find_library(MUSTACHE_C_LIB mustache_c)
-find_library(YAJL_LIB yajl)
-find_path(H2O_INCLUDE h2o.h)
-find_path(MUSTACHE_C_INCLUDE mustache.h)
-find_path(YAJL_INCLUDE yajl/yajl_gen.h)
+find_library(CRYPTO_LIB crypto REQUIRED)
+find_library(H2O_LIB h2o-evloop REQUIRED)
+find_library(MUSTACHE_C_LIB mustache_c REQUIRED)
+find_library(NUMA_LIB numa REQUIRED)
+find_library(PQ_LIB pq REQUIRED)
+find_library(SSL_LIB ssl REQUIRED)
+find_library(YAJL_LIB yajl REQUIRED)
+find_library(Z_LIB z REQUIRED)
+find_path(H2O_INCLUDE h2o.h REQUIRED)
+find_path(MUSTACHE_C_INCLUDE mustache.h REQUIRED)
+find_path(NUMA_INCLUDE numaif.h REQUIRED)
+find_path(OPENSSL_INCLUDE openssl/ssl.h REQUIRED)
+find_path(PQ_INCLUDE postgresql/libpq-fe.h REQUIRED)
+find_path(YAJL_INCLUDE yajl/yajl_gen.h REQUIRED)
+include_directories(src ${H2O_INCLUDE} ${MUSTACHE_C_INCLUDE} ${NUMA_INCLUDE} ${OPENSSL_INCLUDE})
+include_directories(${PQ_INCLUDE} ${YAJL_INCLUDE})
+set(CMAKE_C_STANDARD 11)
+set(CMAKE_C_STANDARD_REQUIRED ON)
+add_compile_definitions(H2O_USE_LIBUV=0)
 set(COMMON_OPTIONS -flto -pthread)
-add_compile_options(-std=gnu11 -pedantic -Wall -Wextra ${COMMON_OPTIONS})
+add_compile_options(-pedantic -Wall -Wextra ${COMMON_OPTIONS})
 set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -D_FORTIFY_SOURCE=2")
 set(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} -O3")
 set(CMAKE_C_FLAGS_RELWITHDEBINFO "${CMAKE_C_FLAGS_RELWITHDEBINFO} -O3")
-add_definitions(-DH2O_USE_LIBUV=0)
-include_directories(src ${H2O_INCLUDE} ${MUSTACHE_C_INCLUDE} ${YAJL_INCLUDE})
 file(GLOB_RECURSE SOURCES "src/*.c")
 add_executable(${PROJECT_NAME} ${SOURCES})
 target_link_libraries(${PROJECT_NAME} ${COMMON_OPTIONS})
-target_link_libraries(${PROJECT_NAME} ${H2O_LIB} ssl crypto numa pq z ${MUSTACHE_C_LIB} ${YAJL_LIB})
+target_link_libraries(${PROJECT_NAME} ${H2O_LIB} ${MUSTACHE_C_LIB} ${NUMA_LIB} ${PQ_LIB} ${SSL_LIB})
+target_link_libraries(${PROJECT_NAME} ${CRYPTO_LIB} ${YAJL_LIB} ${Z_LIB})
 install(TARGETS ${PROJECT_NAME} RUNTIME DESTINATION bin)
 file(GLOB TEMPLATES "template/*")
 install(FILES ${TEMPLATES} DESTINATION share/${PROJECT_NAME}/template)

+ 11 - 12
frameworks/C/h2o/README.md

@@ -1,29 +1,28 @@
 # h2o
 
-This is a framework implementation using the [H2O](https://h2o.examp1e.net) HTTP server. It builds directly on top of `libh2o` instead of running the standalone server.
+This is a framework implementation using the [H2O](https://h2o.examp1e.net) HTTP server. It
+builds directly on top of `libh2o` instead of running the standalone server.
 
 ## Requirements
 
-[CMake](https://cmake.org), [H2O](https://h2o.examp1e.net), [libpq](https://www.postgresql.org), [mustache-c](https://github.com/x86-64/mustache-c), [OpenSSL](https://www.openssl.org), [YAJL](https://lloyd.github.io/yajl)
+[CMake](https://cmake.org), [H2O](https://h2o.examp1e.net), [libpq](https://www.postgresql.org),
+[mustache-c](https://github.com/x86-64/mustache-c), [numactl](https://github.com/numactl/numactl),
+[OpenSSL](https://www.openssl.org), [YAJL](https://lloyd.github.io/yajl)
 
 ## Test implementations
 
-The test implementations are located into the `src/handlers` directory.
+The test implementations are located into the `src/handlers` directory - refer to the
+`initialize_*_handler*()` functions.
 
 ## Performance tuning
 
-If the test environment changes, it will probably be necessary to tune some of the framework settings in order to achieve the best performance possible. The most significant parameter is the maximum number of database connections per thread, which is controlled by the `DB_CONN` variable in the `h2o.sh` script.
+If the test environment changes, it will probably be necessary to tune some of the framework
+settings in order to achieve the best performance possible. The most significant parameter is
+the maximum number of database connections per thread, which is controlled by the `DB_CONN`
+variable in the `h2o.sh` script.
 
 ## Performance issues
 
-### Database tests
-
-`libpq` does not support command pipelining, and implementing anything equivalent on top of it conflicts with the requirements.
-
-### Database updates
-
-In the Citrine environment the database connection settings that improve the performance on the updates test make the other database results worse, and vice versa.
-
 ### Plaintext
 
 `libh2o` performs at least one system call per pipelined response.

+ 2 - 2
frameworks/C/h2o/h2o.dockerfile

@@ -19,7 +19,7 @@ RUN mkdir -p "$MUSTACHE_C_BUILD_DIR" && \
     cd "$MUSTACHE_C_BUILD_DIR" && \
     wget -qO - "https://github.com/x86-64/mustache-c/archive/${MUSTACHE_C_REVISION}.tar.gz" | \
     tar xz --strip-components=1 && \
-    CFLAGS="-O3 -flto -march=native" ./autogen.sh --prefix="$MUSTACHE_C_PREFIX" && \
+    CFLAGS="-O3 -flto -march=native -mtune=native" ./autogen.sh --prefix="$MUSTACHE_C_PREFIX" && \
     make -j "$(nproc)" install && \
     cd .. && \
     rm -rf "$MUSTACHE_C_BUILD_DIR"
@@ -36,7 +36,7 @@ RUN mkdir -p "${H2O_BUILD_DIR}/build" && \
     wget -qO - "https://github.com/h2o/h2o/archive/${H2O_VERSION}.tar.gz" | \
     tar xz --strip-components=1 && \
     cd build && \
-    cmake -DCMAKE_INSTALL_PREFIX="$H2O_PREFIX" -DCMAKE_C_FLAGS="-flto -march=native" \
+    cmake -DCMAKE_INSTALL_PREFIX="$H2O_PREFIX" -DCMAKE_C_FLAGS="-flto -march=native -mtune=native" \
           -DCMAKE_AR=/usr/bin/gcc-ar -DCMAKE_RANLIB=/usr/bin/gcc-ranlib -G Ninja .. && \
     cmake --build . -j && \
     cmake --install . && \

+ 4 - 3
frameworks/C/h2o/h2o.sh

@@ -25,16 +25,17 @@ if [[ -z "$MUSTACHE_C_PREFIX" ]]; then
 fi
 
 if [[ "$BENCHMARK_ENV" = "Azure" ]]; then
-	DB_CONN=5
+	DB_CONN=2
 else
-	DB_CONN=5
+	DB_CONN=1
 fi
 
 build_h2o_app()
 {
 	cmake -DCMAKE_INSTALL_PREFIX="$H2O_APP_PREFIX" -DCMAKE_BUILD_TYPE=Release \
 	      -DCMAKE_PREFIX_PATH="${H2O_PREFIX};${MUSTACHE_C_PREFIX}" \
-	      -DCMAKE_C_FLAGS="-march=native $1" -G Ninja "$H2O_APP_SRC_ROOT"
+	      -DCMAKE_C_FLAGS="-march=native -mtune=native $1" -G Ninja \
+	      "$H2O_APP_SRC_ROOT"
 	cmake --build . --clean-first -j
 }
 

+ 495 - 300
frameworks/C/h2o/src/database.c

@@ -30,21 +30,25 @@
 
 #include "database.h"
 #include "error.h"
+#include "global_data.h"
 #include "list.h"
-#include "thread.h"
 
-#define IS_RESETTING 1
-#define IS_WRITING 2
+#define EXPECT_SYNC 1
+#define IGNORE_RESULT 2
+#define IS_RESETTING 4
+#define MS_IN_S 1000
 
 typedef struct {
 	list_t l;
 	PGconn *conn;
-	thread_context_t *ctx;
-	db_query_param_t *param;
-	list_t *prepared_statement;
+	db_conn_pool_t *pool;
+	const list_t *prepared_statement;
+	queue_t queries;
 	h2o_socket_t *sock;
+	size_t query_num;
 	uint_fast32_t flags;
-	h2o_timeout_entry_t h2o_timeout_entry;
+	int sd;
+	h2o_timeout_entry_t timeout;
 } db_conn_t;
 
 typedef struct {
@@ -53,98 +57,117 @@ typedef struct {
 	const char *query;
 } prepared_statement_t;
 
-static int do_database_write(db_conn_t *db_conn);
-static int do_execute_query(db_conn_t *db_conn, bool direct_notification);
-static void error_notification(thread_context_t *ctx, bool timeout, const char *error_string);
-static void on_database_connect_error(db_conn_t *db_conn, bool timeout, const char *error_string);
+static h2o_socket_t *create_socket(int sd, h2o_loop_t *loop);
+static int do_execute_query(db_conn_t *conn, db_query_param_t *param);
+static void error_notification(db_conn_pool_t *pool, bool timeout, const char *error_string);
+static void on_database_connect_error(db_conn_t *conn, bool timeout, const char *error_string);
+static void on_database_connect_read_ready(h2o_socket_t *sock, const char *err);
 static void on_database_connect_timeout(h2o_timeout_entry_t *entry);
-static void on_database_error(db_conn_t *db_conn, const char *error_string);
-static void on_database_read_ready(h2o_socket_t *db_sock, const char *err);
-static void on_database_timeout(h2o_timeout_entry_t *entry);
-static void on_database_write_ready(h2o_socket_t *db_sock, const char *err);
-static void poll_database_connection(h2o_socket_t *db_sock, const char *err);
-static void process_query(db_conn_t *db_conn);
-static void start_database_connect(thread_context_t *ctx, db_conn_t *db_conn);
-
-static int do_database_write(db_conn_t *db_conn)
+static void on_database_connect_write_ready(h2o_socket_t *sock, const char *err);
+static void on_database_error(db_conn_t *conn, const char *error_string);
+static void on_database_read_ready(h2o_socket_t *sock, const char *err);
+static void on_database_timeout(h2o_timeout_entry_t *timeout);
+static void on_database_write_ready(h2o_socket_t *sock, const char *err);
+static void poll_database_connection(h2o_socket_t *sock, const char *err);
+static void prepare_statements(db_conn_t *conn);
+static void process_queries(db_conn_t *conn);
+static void start_database_connect(db_conn_pool_t *pool, db_conn_t *conn);
+
+static h2o_socket_t *create_socket(int sd, h2o_loop_t *loop)
 {
-	assert(db_conn->param);
+	sd = dup(sd);
 
-	int ret = db_conn->param->on_write_ready(db_conn->param, db_conn->conn);
+	if (sd < 0) {
+		STANDARD_ERROR("dup");
+		return NULL;
+	}
+
+	const int flags = fcntl(sd, F_GETFD);
 
-	if (!ret)
-		db_conn->flags &= ~IS_WRITING;
-	else if (ret < 0) {
-		ERROR(PQerrorMessage(db_conn->conn));
-		on_database_error(db_conn, DB_ERROR);
+	if (flags < 0 || fcntl(sd, F_SETFD, flags | FD_CLOEXEC)) {
+		STANDARD_ERROR("fcntl");
+		close(sd);
+		return NULL;
 	}
-	else {
-		h2o_socket_notify_write(db_conn->sock, on_database_write_ready);
-		ret = 0;
+
+	h2o_socket_t * const ret = h2o_evloop_socket_create(loop, sd, H2O_SOCKET_FLAG_DONT_READ);
+
+	if (!ret) {
+		errno = ENOMEM;
+		STANDARD_ERROR("h2o_evloop_socket_create");
+		close(sd);
 	}
 
 	return ret;
 }
 
-static int do_execute_query(db_conn_t *db_conn, bool direct_notification)
+static int do_execute_query(db_conn_t *conn, db_query_param_t *param)
 {
-	const int ec = db_conn->param->flags & IS_PREPARED ?
-	               PQsendQueryPrepared(db_conn->conn,
-	                                   db_conn->param->command,
-	                                   db_conn->param->nParams,
-	                                   db_conn->param->paramValues,
-	                                   db_conn->param->paramLengths,
-	                                   db_conn->param->paramFormats,
-	                                   db_conn->param->resultFormat) :
-	               PQsendQuery(db_conn->conn, db_conn->param->command);
-	int ret = 1;
-
-	if (ec) {
-		if (db_conn->param->flags & IS_SINGLE_ROW)
-			PQsetSingleRowMode(db_conn->conn);
-
-		db_conn->h2o_timeout_entry.cb = on_database_timeout;
-		h2o_timeout_link(db_conn->ctx->event_loop.h2o_ctx.loop,
-		                 &db_conn->ctx->db_state.h2o_timeout,
-		                 &db_conn->h2o_timeout_entry);
-		h2o_socket_read_start(db_conn->sock, on_database_read_ready);
-
-		const int send_status = PQflush(db_conn->conn);
+	assert(conn->query_num);
+	assert((conn->queries.head && conn->query_num < conn->pool->config->max_pipeline_query_num) ||
+	       (!conn->queries.head && conn->query_num == conn->pool->config->max_pipeline_query_num));
+
+	const int ec = param->flags & IS_PREPARED ?
+	               PQsendQueryPrepared(conn->conn,
+	                                   param->command,
+	                                   param->nParams,
+	                                   param->paramValues,
+	                                   param->paramLengths,
+	                                   param->paramFormats,
+	                                   param->resultFormat) :
+	               PQsendQueryParams(conn->conn,
+	                                 param->command,
+	                                 param->nParams,
+	                                 param->paramTypes,
+	                                 param->paramValues,
+	                                 param->paramLengths,
+	                                 param->paramFormats,
+	                                 param->resultFormat);
+
+	if (!ec) {
+		ERROR(PQerrorMessage(conn->conn));
+		return 1;
+	}
 
-		if (send_status < 0) {
-			if (direct_notification)
-				db_conn->param = NULL;
+	if (!PQpipelineSync(conn->conn)) {
+		LIBRARY_ERROR("PQpipelineSync", PQerrorMessage(conn->conn));
+		return 1;
+	}
 
-			LIBRARY_ERROR("PQflush", PQerrorMessage(db_conn->conn));
-			on_database_error(db_conn, DB_ERROR);
-		}
-		else {
-			ret = 0;
+	const int send_status = PQflush(conn->conn);
 
-			if (send_status)
-				h2o_socket_notify_write(db_conn->sock, on_database_write_ready);
-		}
+	if (send_status < 0) {
+		LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
+		return 1;
 	}
-	else {
-		if (direct_notification)
-			db_conn->param = NULL;
-
-		ERROR(PQerrorMessage(db_conn->conn));
-		on_database_error(db_conn, DB_ERROR);
+	else if (send_status)
+		h2o_socket_notify_write(conn->sock, on_database_write_ready);
+
+	if (!conn->queries.head && !(conn->flags & (EXPECT_SYNC | IGNORE_RESULT))) {
+		assert(!h2o_timeout_is_linked(&conn->timeout));
+		conn->timeout.cb = on_database_timeout;
+		h2o_timeout_link(conn->pool->loop, &conn->pool->timeout, &conn->timeout);
+		h2o_socket_read_start(conn->sock, on_database_read_ready);
 	}
 
-	return ret;
+	param->l.next = NULL;
+	*conn->queries.tail = &param->l;
+	conn->queries.tail = &param->l.next;
+	conn->query_num--;
+	return 0;
 }
 
-static void error_notification(thread_context_t *ctx, bool timeout, const char *error_string)
+static void error_notification(db_conn_pool_t *pool, bool timeout, const char *error_string)
 {
-	if (!--ctx->db_state.db_conn_num) {
+	assert(pool->conn_num < pool->config->max_db_conn_num);
+
+	if (++pool->conn_num == pool->config->max_db_conn_num) {
 		// We don't want to keep requests waiting for an unbounded amount of time.
-		list_t *iter = ctx->db_state.queries.head;
+		list_t *iter = pool->queries.head;
 
-		ctx->db_state.queries.head = NULL;
-		ctx->db_state.queries.tail = &ctx->db_state.queries.head;
-		ctx->db_state.query_num = 0;
+		pool->queries.head = NULL;
+		pool->queries.tail = &pool->queries.head;
+		pool->query_num = pool->config->max_query_num;
 
 		if (iter)
 			do {
@@ -161,297 +184,443 @@ static void error_notification(thread_context_t *ctx, bool timeout, const char *
 	}
 }
 
-static void on_database_connect_error(db_conn_t *db_conn, bool timeout, const char *error_string)
+static void on_database_connect_error(db_conn_t *conn, bool timeout, const char *error_string)
 {
-	thread_context_t * const ctx = db_conn->ctx;
-
-	error_notification(ctx, timeout, error_string);
-	h2o_timeout_unlink(&db_conn->h2o_timeout_entry);
-	h2o_socket_read_stop(db_conn->sock);
-	h2o_socket_close(db_conn->sock);
-	PQfinish(db_conn->conn);
-	free(db_conn);
+	error_notification(conn->pool, timeout, error_string);
+	h2o_timeout_unlink(&conn->timeout);
+	h2o_socket_read_stop(conn->sock);
+	h2o_socket_close(conn->sock);
+	PQfinish(conn->conn);
+	free(conn);
+}
+
+static void on_database_connect_read_ready(h2o_socket_t *sock, const char *err)
+{
+	db_conn_t * const conn = sock->data;
+
+	if (err) {
+		ERROR(err);
+		on_database_connect_error(conn, false, DB_ERROR);
+		return;
+	}
+
+	if (!PQconsumeInput(conn->conn)) {
+		LIBRARY_ERROR("PQconsumeInput", PQerrorMessage(conn->conn));
+		on_database_connect_error(conn, false, DB_ERROR);
+		return;
+	}
+
+	const int send_status = PQflush(conn->conn);
+
+	if (send_status < 0) {
+		LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
+		on_database_connect_error(conn, false, DB_ERROR);
+		return;
+	}
+	else if (send_status) {
+		h2o_socket_notify_write(conn->sock, on_database_write_ready);
+		return;
+	}
+
+	while (!PQisBusy(conn->conn)) {
+		PGresult * const result = PQgetResult(conn->conn);
+
+		if (result) {
+			switch (PQresultStatus(result)) {
+				case PGRES_COMMAND_OK:
+					break;
+				case PGRES_PIPELINE_SYNC:
+					PQclear(result);
+					h2o_timeout_unlink(&conn->timeout);
+					h2o_socket_read_stop(conn->sock);
+					process_queries(conn);
+					return;
+				default:
+					LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
+					PQclear(result);
+					on_database_connect_error(conn, false, DB_ERROR);
+					return;
+			}
+
+			PQclear(result);
+		}
+	}
 }
 
 static void on_database_connect_timeout(h2o_timeout_entry_t *entry)
 {
-	db_conn_t * const db_conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, h2o_timeout_entry, entry);
+	db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, timeout, entry);
 
 	ERROR(DB_TIMEOUT_ERROR);
-	on_database_connect_error(db_conn, true, DB_TIMEOUT_ERROR);
+	on_database_connect_error(conn, true, DB_TIMEOUT_ERROR);
 }
 
-static void on_database_error(db_conn_t *db_conn, const char *error_string)
+static void on_database_connect_write_ready(h2o_socket_t *sock, const char *err)
 {
-	if (db_conn->prepared_statement)
-		on_database_connect_error(db_conn, false, error_string);
+	db_conn_t * const conn = sock->data;
+
+	if (err) {
+		ERROR(err);
+		on_database_connect_error(conn, false, err);
+	}
 	else {
-		if (db_conn->param) {
-			db_conn->param->on_error(db_conn->param, error_string);
-			db_conn->param = NULL;
-		}
+		const int send_status = PQflush(conn->conn);
 
-		if (PQstatus(db_conn->conn) == CONNECTION_OK) {
-			h2o_timeout_unlink(&db_conn->h2o_timeout_entry);
-			h2o_socket_read_stop(db_conn->sock);
-			process_query(db_conn);
+		if (send_status < 0) {
+			LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
+			on_database_connect_error(conn, false, DB_ERROR);
 		}
-		else
-			start_database_connect(db_conn->ctx, db_conn);
+		else if (send_status)
+			h2o_socket_notify_write(conn->sock, on_database_connect_write_ready);
 	}
 }
 
-static void on_database_read_ready(h2o_socket_t *db_sock, const char *err)
+static void on_database_error(db_conn_t *conn, const char *error_string)
 {
-	db_conn_t * const db_conn = db_sock->data;
+	if (conn->queries.head)
+		do {
+			db_query_param_t * const param = H2O_STRUCT_FROM_MEMBER(db_query_param_t,
+			                                                        l,
+			                                                        conn->queries.head);
 
-	if (err)
+			// The callback may free the db_query_param_t structure.
+			conn->queries.head = param->l.next;
+			param->on_error(param, error_string);
+		} while (conn->queries.head);
+
+	start_database_connect(conn->pool, conn);
+}
+
+static void on_database_read_ready(h2o_socket_t *sock, const char *err)
+{
+	db_conn_t * const conn = sock->data;
+
+	if (err) {
 		ERROR(err);
-	else {
-		if (PQconsumeInput(db_conn->conn)) {
-			const int send_status = PQflush(db_conn->conn);
-
-			if (send_status > 0)
-				h2o_socket_notify_write(db_conn->sock, on_database_write_ready);
-
-			if (send_status >= 0) {
-				while (!PQisBusy(db_conn->conn)) {
-					PGresult * const result = PQgetResult(db_conn->conn);
-
-					if (db_conn->param)
-						switch (db_conn->param->on_result(db_conn->param, result)) {
-							case WANT_WRITE:
-								db_conn->flags |= IS_WRITING;
-
-								if (do_database_write(db_conn))
-									return;
-
-								break;
-							case DONE:
-								db_conn->param = NULL;
-								h2o_timeout_unlink(&db_conn->h2o_timeout_entry);
-								break;
-							default:
-								break;
-						}
-					else if (result) {
-						if (PQresultStatus(result) != PGRES_COMMAND_OK)
-							LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
-
-						PQclear(result);
-					}
-
-					if (!result) {
-						assert(!db_conn->param);
-						h2o_timeout_unlink(&db_conn->h2o_timeout_entry);
-						h2o_socket_read_stop(db_conn->sock);
-						process_query(db_conn);
-						break;
-					}
-				}
+		on_database_error(conn, err);
+		return;
+	}
+
+	if (!PQconsumeInput(conn->conn)) {
+		LIBRARY_ERROR("PQconsumeInput", PQerrorMessage(conn->conn));
+		on_database_error(conn, DB_ERROR);
+		return;
+	}
+
+	const int send_status = PQflush(conn->conn);
 
+	if (send_status < 0) {
+		LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
+		on_database_error(conn, DB_ERROR);
+		return;
+	}
+	else if (send_status) {
+		h2o_socket_notify_write(conn->sock, on_database_write_ready);
+		return;
+	}
+
+	while (!PQisBusy(conn->conn)) {
+		PGresult * const result = PQgetResult(conn->conn);
+
+		if (conn->flags & IGNORE_RESULT) {
+			if (result)
+				PQclear(result);
+			else
+				conn->flags &= ~IGNORE_RESULT;
+		}
+		else if (conn->flags & EXPECT_SYNC) {
+			if (PQresultStatus(result) == PGRES_PIPELINE_SYNC) {
+				PQclear(result);
+				conn->flags &= ~EXPECT_SYNC;
+			}
+			else {
+				LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
+				PQclear(result);
+				on_database_error(conn, DB_ERROR);
 				return;
 			}
 		}
-
-		ERROR(PQerrorMessage(db_conn->conn));
+		else if (conn->queries.head) {
+			db_query_param_t * const param = H2O_STRUCT_FROM_MEMBER(db_query_param_t,
+			                                                        l,
+			                                                        conn->queries.head);
+			// The callback may free the db_query_param_t structure.
+			list_t * const next = param->l.next;
+			const bool nonnull_result = !!result;
+
+			if (param->on_result(param, result) == DONE) {
+				conn->query_num++;
+				h2o_timeout_unlink(&conn->timeout);
+				conn->timeout.cb = on_database_timeout;
+				h2o_timeout_link(conn->pool->loop, &conn->pool->timeout, &conn->timeout);
+				conn->flags |= EXPECT_SYNC;
+				conn->queries.head = next;
+
+				if (!next)
+					conn->queries.tail = &conn->queries.head;
+
+				if (nonnull_result)
+					conn->flags |= IGNORE_RESULT;
+			}
+			else
+				assert(nonnull_result);
+		}
+		else {
+			assert(!result);
+			h2o_timeout_unlink(&conn->timeout);
+			h2o_socket_read_stop(conn->sock);
+			break;
+		}
 	}
 
-	on_database_error(db_conn, DB_ERROR);
+	process_queries(conn);
 }
 
-static void on_database_timeout(h2o_timeout_entry_t *entry)
+static void on_database_timeout(h2o_timeout_entry_t *timeout)
 {
-	db_conn_t * const db_conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, h2o_timeout_entry, entry);
+	db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, timeout, timeout);
 
 	ERROR(DB_TIMEOUT_ERROR);
 
-	if (db_conn->param) {
-		db_conn->param->on_timeout(db_conn->param);
-		db_conn->param = NULL;
+	if (conn->queries.head) {
+		db_query_param_t * const param = H2O_STRUCT_FROM_MEMBER(db_query_param_t,
+		                                                        l,
+		                                                        conn->queries.head);
+
+		conn->queries.head = param->l.next;
+		param->on_timeout(param);
 	}
 
-	start_database_connect(db_conn->ctx, db_conn);
+	on_database_error(conn, DB_TIMEOUT_ERROR);
 }
 
-static void on_database_write_ready(h2o_socket_t *db_sock, const char *err)
+static void on_database_write_ready(h2o_socket_t *sock, const char *err)
 {
-	db_conn_t * const db_conn = db_sock->data;
+	db_conn_t * const conn = sock->data;
 
 	if (err) {
 		ERROR(err);
-		on_database_error(db_conn, DB_ERROR);
+		on_database_error(conn, err);
 	}
 	else {
-		const int send_status = PQflush(db_conn->conn);
+		const int send_status = PQflush(conn->conn);
 
-		if (!send_status) {
-			if (db_conn->flags & IS_WRITING && db_conn->param)
-				do_database_write(db_conn);
+		if (send_status < 0) {
+			LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
+			on_database_error(conn, DB_ERROR);
 		}
-		else if (send_status < 0) {
-			LIBRARY_ERROR("PQflush", PQerrorMessage(db_conn->conn));
-			on_database_error(db_conn, DB_ERROR);
+		else {
+			if (send_status)
+				h2o_socket_notify_write(conn->sock, on_database_write_ready);
+
+			process_queries(conn);
 		}
-		else
-			h2o_socket_notify_write(db_conn->sock, on_database_write_ready);
 	}
 }
 
-static void poll_database_connection(h2o_socket_t *db_sock, const char *err)
+static void poll_database_connection(h2o_socket_t *sock, const char *err)
 {
-	db_conn_t * const db_conn = db_sock->data;
+	db_conn_t * const conn = sock->data;
 
 	if (err)
 		ERROR(err);
 	else {
-		const PostgresPollingStatusType status = db_conn->flags & IS_RESETTING ?
-		                                         PQresetPoll(db_conn->conn) :
-		                                         PQconnectPoll(db_conn->conn);
+		const PostgresPollingStatusType status = conn->flags & IS_RESETTING ?
+		                                         PQresetPoll(conn->conn) :
+		                                         PQconnectPoll(conn->conn);
+		const int sd = PQsocket(conn->conn);
 
 		switch (status) {
 			case PGRES_POLLING_WRITING:
-				if (!h2o_socket_is_writing(db_conn->sock))
-					h2o_socket_notify_write(db_conn->sock, poll_database_connection);
+				h2o_socket_read_stop(conn->sock);
+
+				if (sd != conn->sd) {
+					h2o_socket_t * const sock = create_socket(sd, conn->pool->loop);
+
+					if (!sock)
+						break;
+
+					h2o_socket_close(conn->sock);
+					conn->sd = sd;
+					conn->sock = sock;
+				}
+
+				if (!h2o_socket_is_writing(conn->sock))
+					h2o_socket_notify_write(conn->sock, poll_database_connection);
 
-				h2o_socket_read_stop(db_conn->sock);
 				return;
 			case PGRES_POLLING_OK:
-				if (PQsetnonblocking(db_conn->conn, 1)) {
-					LIBRARY_ERROR("PQsetnonblocking", PQerrorMessage(db_conn->conn));
+				h2o_timeout_unlink(&conn->timeout);
+				h2o_socket_read_stop(conn->sock);
+
+				if (PQsetnonblocking(conn->conn, 1)) {
+					LIBRARY_ERROR("PQsetnonblocking", PQerrorMessage(conn->conn));
+					break;
+				}
+
+				if (!PQenterPipelineMode(conn->conn)) {
+					ERROR("PQenterPipelineMode");
 					break;
 				}
 
-				h2o_timeout_unlink(&db_conn->h2o_timeout_entry);
-				h2o_socket_read_stop(db_conn->sock);
-				process_query(db_conn);
+				if (sd != conn->sd) {
+					h2o_socket_t * const sock = create_socket(sd, conn->pool->loop);
+
+					if (!sock)
+						break;
+
+					h2o_socket_close(conn->sock);
+					conn->sd = sd;
+					conn->sock = sock;
+				}
+
+				prepare_statements(conn);
 				return;
 			case PGRES_POLLING_READING:
-				h2o_socket_read_start(db_conn->sock, poll_database_connection);
+				if (sd != conn->sd) {
+					h2o_socket_t * const sock = create_socket(sd, conn->pool->loop);
+
+					if (!sock)
+						break;
+
+					h2o_socket_read_stop(conn->sock);
+					h2o_socket_close(conn->sock);
+					conn->sd = sd;
+					conn->sock = sock;
+				}
+
+				h2o_socket_read_start(conn->sock, poll_database_connection);
 				return;
 			default:
-				ERROR(PQerrorMessage(db_conn->conn));
+				ERROR(PQerrorMessage(conn->conn));
 		}
 	}
 
-	on_database_connect_error(db_conn, false, DB_ERROR);
+	on_database_connect_error(conn, false, DB_ERROR);
 }
 
-static void process_query(db_conn_t *db_conn)
+static void prepare_statements(db_conn_t *conn)
 {
-	if (db_conn->prepared_statement) {
-		const prepared_statement_t * const p = H2O_STRUCT_FROM_MEMBER(prepared_statement_t,
-		                                                              l,
-		                                                              db_conn->prepared_statement);
-
-		if (PQsendPrepare(db_conn->conn, p->name, p->query, 0, NULL)) {
-			db_conn->prepared_statement = p->l.next;
-			db_conn->h2o_timeout_entry.cb = on_database_connect_timeout;
-			h2o_timeout_link(db_conn->ctx->event_loop.h2o_ctx.loop,
-			                 &db_conn->ctx->db_state.h2o_timeout,
-			                 &db_conn->h2o_timeout_entry);
-			h2o_socket_read_start(db_conn->sock, on_database_read_ready);
-			on_database_write_ready(db_conn->sock, NULL);
-		}
-		else {
-			LIBRARY_ERROR("PQsendPrepare", PQerrorMessage(db_conn->conn));
-			on_database_connect_error(db_conn, false, DB_ERROR);
+	if (conn->prepared_statement) {
+		const list_t *iter = conn->prepared_statement;
+
+		do {
+			const prepared_statement_t * const p = H2O_STRUCT_FROM_MEMBER(prepared_statement_t,
+			                                                              l,
+			                                                              iter);
+
+			if (!PQsendPrepare(conn->conn, p->name, p->query, 0, NULL)) {
+				LIBRARY_ERROR("PQsendPrepare", PQerrorMessage(conn->conn));
+				on_database_connect_error(conn, false, DB_ERROR);
+				return;
+			}
+
+			iter = iter->next;
+		} while (iter);
+
+		if (!PQpipelineSync(conn->conn)) {
+			LIBRARY_ERROR("PQpipelineSync", PQerrorMessage(conn->conn));
+			on_database_connect_error(conn, false, DB_ERROR);
+			return;
 		}
+
+		conn->prepared_statement = NULL;
+		conn->timeout.cb = on_database_connect_timeout;
+		h2o_timeout_link(conn->pool->loop, &conn->pool->timeout, &conn->timeout);
+		h2o_socket_read_start(conn->sock, on_database_connect_read_ready);
+		on_database_connect_write_ready(conn->sock, NULL);
 	}
-	else if (db_conn->ctx->db_state.query_num) {
-		db_conn->ctx->db_state.query_num--;
+	else
+		process_queries(conn);
+}
 
-		if (db_conn->ctx->db_state.queries.tail == &db_conn->ctx->db_state.queries.head->next) {
-			assert(!db_conn->ctx->db_state.query_num);
-			db_conn->ctx->db_state.queries.tail = &db_conn->ctx->db_state.queries.head;
+static void process_queries(db_conn_t *conn)
+{
+	while (conn->query_num && conn->pool->queries.head) {
+		db_query_param_t * const param = H2O_STRUCT_FROM_MEMBER(db_query_param_t,
+		                                                        l,
+		                                                        conn->pool->queries.head);
+
+		if (++conn->pool->query_num == conn->pool->config->max_query_num) {
+			assert(conn->pool->queries.tail == &param->l.next);
+			conn->pool->queries.tail = &conn->pool->queries.head;
 		}
 
-		db_conn->param = H2O_STRUCT_FROM_MEMBER(db_query_param_t,
-		                                        l,
-		                                        db_conn->ctx->db_state.queries.head);
-		db_conn->ctx->db_state.queries.head = db_conn->ctx->db_state.queries.head->next;
-		do_execute_query(db_conn, false);
+		conn->pool->queries.head = param->l.next;
+
+		if (do_execute_query(conn, param)) {
+			param->on_error(param, DB_ERROR);
+
+			if (PQstatus(conn->conn) != CONNECTION_OK) {
+				on_database_error(conn, DB_ERROR);
+				return;
+			}
+		}
 	}
-	else {
-		db_conn->l.next = db_conn->ctx->db_state.db_conn;
-		db_conn->ctx->db_state.db_conn = &db_conn->l;
-		db_conn->ctx->db_state.free_db_conn_num++;
+
+	if (!conn->queries.head && !(conn->flags & (EXPECT_SYNC | IGNORE_RESULT))) {
+		conn->l.next = conn->pool->conn;
+		conn->pool->conn = &conn->l;
 	}
 }
 
-static void start_database_connect(thread_context_t *ctx, db_conn_t *db_conn)
+static void start_database_connect(db_conn_pool_t *pool, db_conn_t *conn)
 {
-	if (db_conn) {
-		db_conn->flags = IS_RESETTING;
-		h2o_timeout_unlink(&db_conn->h2o_timeout_entry);
-		h2o_socket_read_stop(db_conn->sock);
-		h2o_socket_close(db_conn->sock);
-
-		if (!PQresetStart(db_conn->conn)) {
-			LIBRARY_ERROR("PQresetStart", PQerrorMessage(db_conn->conn));
+	if (conn) {
+		PGconn * const c = conn->conn;
+
+		h2o_timeout_unlink(&conn->timeout);
+		h2o_socket_read_stop(conn->sock);
+		h2o_socket_close(conn->sock);
+
+		if (!PQresetStart(c)) {
+			LIBRARY_ERROR("PQresetStart", PQerrorMessage(c));
 			goto error_dup;
 		}
+
+		memset(conn, 0, sizeof(*conn));
+		conn->conn = c;
+		conn->flags = IS_RESETTING;
 	}
 	else {
-		ctx->db_state.db_conn_num++;
-		db_conn = h2o_mem_alloc(sizeof(*db_conn));
-		memset(db_conn, 0, sizeof(*db_conn));
-
-		const char * const conninfo = ctx->config->db_host ? ctx->config->db_host : "";
-
-		db_conn->conn = PQconnectStart(conninfo);
+		assert(pool->conn_num);
+		pool->conn_num--;
+		conn = h2o_mem_alloc(sizeof(*conn));
+		memset(conn, 0, sizeof(*conn));
+		conn->conn = PQconnectStart(pool->conninfo);
 
-		if (!db_conn->conn) {
+		if (!conn->conn) {
 			errno = ENOMEM;
 			STANDARD_ERROR("PQconnectStart");
 			goto error_connect;
 		}
 
-		if (PQstatus(db_conn->conn) == CONNECTION_BAD) {
-			LIBRARY_ERROR("PQstatus", PQerrorMessage(db_conn->conn));
+		if (PQstatus(conn->conn) == CONNECTION_BAD) {
+			LIBRARY_ERROR("PQstatus", PQerrorMessage(conn->conn));
 			goto error_dup;
 		}
 	}
 
-	const int sd = dup(PQsocket(db_conn->conn));
-
-	if (sd < 0) {
-		STANDARD_ERROR("dup");
-		goto error_dup;
-	}
-
-	const int flags = fcntl(sd, F_GETFD);
-
-	if (flags < 0 || fcntl(sd, F_SETFD, flags | FD_CLOEXEC)) {
-		STANDARD_ERROR("fcntl");
-		goto error_fcntl;
-	}
-
-	db_conn->sock = h2o_evloop_socket_create(ctx->event_loop.h2o_ctx.loop,
-	                                         sd,
-	                                         H2O_SOCKET_FLAG_DONT_READ);
-
-	if (db_conn->sock) {
-		db_conn->sock->data = db_conn;
-		db_conn->ctx = ctx;
-		db_conn->h2o_timeout_entry.cb = on_database_connect_timeout;
-		db_conn->prepared_statement = ctx->global_data->prepared_statements;
-		h2o_timeout_link(ctx->event_loop.h2o_ctx.loop,
-		                 &ctx->db_state.h2o_timeout,
-		                 &db_conn->h2o_timeout_entry);
-		h2o_socket_notify_write(db_conn->sock, poll_database_connection);
+	conn->sd = PQsocket(conn->conn);
+	conn->sock = create_socket(conn->sd, pool->loop);
+
+	if (conn->sock) {
+		conn->sock->data = conn;
+		conn->pool = pool;
+		conn->prepared_statement = pool->prepared_statements;
+		conn->queries.tail = &conn->queries.head;
+		conn->query_num = pool->config->max_pipeline_query_num;
+		conn->timeout.cb = on_database_connect_timeout;
+		h2o_timeout_link(pool->loop, &pool->timeout, &conn->timeout);
+		h2o_socket_notify_write(conn->sock, poll_database_connection);
 		return;
 	}
 
-	errno = ENOMEM;
-	STANDARD_ERROR("h2o_evloop_socket_create");
-error_fcntl:
-	close(sd);
 error_dup:
-	PQfinish(db_conn->conn);
+	PQfinish(conn->conn);
 error_connect:
-	free(db_conn);
-	error_notification(ctx, false, DB_ERROR);
+	free(conn);
+	error_notification(pool, false, DB_ERROR);
 }
 
 void add_prepared_statement(const char *name, const char *query, list_t **prepared_statements)
@@ -465,27 +634,36 @@ void add_prepared_statement(const char *name, const char *query, list_t **prepar
 	*prepared_statements = &p->l;
 }
 
-int execute_query(thread_context_t *ctx, db_query_param_t *param)
+int execute_database_query(db_conn_pool_t *pool, db_query_param_t *param)
 {
 	int ret = 1;
 
-	if (ctx->db_state.free_db_conn_num) {
-		db_conn_t * const db_conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, ctx->db_state.db_conn);
+	if (pool->conn) {
+		db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, pool->conn);
+
+		assert(!conn->queries.head);
+		assert(!(conn->flags & (EXPECT_SYNC | IGNORE_RESULT)));
+		pool->conn = conn->l.next;
+		ret = do_execute_query(conn, param);
 
-		ctx->db_state.db_conn = db_conn->l.next;
-		ctx->db_state.free_db_conn_num--;
-		db_conn->param = param;
-		ret = do_execute_query(db_conn, true);
+		if (ret) {
+			if (PQstatus(conn->conn) == CONNECTION_OK) {
+				conn->l.next = pool->conn;
+				pool->conn = &conn->l;
+			}
+			else
+				start_database_connect(conn->pool, conn);
+		}
 	}
-	else if (ctx->db_state.query_num < ctx->config->max_query_num) {
-		if (ctx->db_state.db_conn_num < ctx->config->max_db_conn_num)
-			start_database_connect(ctx, NULL);
+	else if (pool->query_num) {
+		if (pool->conn_num)
+			start_database_connect(pool, NULL);
 
-		if (ctx->db_state.db_conn_num) {
+		if (pool->conn_num < pool->config->max_db_conn_num && pool->query_num) {
 			param->l.next = NULL;
-			*ctx->db_state.queries.tail = &param->l;
-			ctx->db_state.queries.tail = &param->l.next;
-			ctx->db_state.query_num++;
+			*pool->queries.tail = &param->l;
+			pool->queries.tail = &param->l.next;
+			pool->query_num--;
 			ret = 0;
 		}
 	}
@@ -493,31 +671,48 @@ int execute_query(thread_context_t *ctx, db_query_param_t *param)
 	return ret;
 }
 
-void free_database_state(h2o_loop_t *loop, db_state_t *db_state)
+void free_database_connection_pool(db_conn_pool_t *pool)
 {
-	assert(!db_state->query_num && db_state->free_db_conn_num == db_state->db_conn_num);
+	assert(!pool->queries.head);
+	assert(pool->query_num == pool->config->max_query_num);
 
-	list_t *iter = db_state->db_conn;
+	size_t num = 0;
 
-	if (iter)
+	if (pool->conn)
 		do {
-			db_conn_t * const db_conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, iter);
-
-			iter = iter->next;
-			assert(!db_conn->param && !h2o_timeout_is_linked(&db_conn->h2o_timeout_entry));
-			h2o_socket_close(db_conn->sock);
-			PQfinish(db_conn->conn);
-			free(db_conn);
-		} while (iter);
-
-	h2o_timeout_dispose(loop, &db_state->h2o_timeout);
+			db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, pool->conn);
+
+			assert(!conn->queries.head);
+			assert(conn->query_num == pool->config->max_pipeline_query_num);
+			assert(!(conn->flags & (EXPECT_SYNC | IGNORE_RESULT)));
+			assert(!h2o_timeout_is_linked(&conn->timeout));
+			h2o_socket_read_stop(conn->sock);
+			h2o_socket_close(conn->sock);
+			PQfinish(conn->conn);
+			pool->conn = pool->conn->next;
+			free(conn);
+			num++;
+		} while (pool->conn);
+
+	assert(num + pool->conn_num == pool->config->max_db_conn_num);
+	h2o_timeout_dispose(pool->loop, &pool->timeout);
 }
 
-void initialize_database_state(h2o_loop_t *loop, db_state_t *db_state)
+void initialize_database_connection_pool(const char *conninfo,
+                                         const struct config_t *config,
+                                         const list_t *prepared_statements,
+                                         h2o_loop_t *loop,
+                                         db_conn_pool_t *pool)
 {
-	memset(db_state, 0, sizeof(*db_state));
-	db_state->queries.tail = &db_state->queries.head;
-	h2o_timeout_init(loop, &db_state->h2o_timeout, H2O_DEFAULT_HTTP1_REQ_TIMEOUT);
+	memset(pool, 0, sizeof(*pool));
+	pool->config = config;
+	pool->conninfo = conninfo ? conninfo : "";
+	pool->loop = loop;
+	pool->prepared_statements = prepared_statements;
+	pool->queries.tail = &pool->queries.head;
+	pool->conn_num = config->max_db_conn_num;
+	pool->query_num = config->max_query_num;
+	h2o_timeout_init(loop, &pool->timeout, config->db_timeout * MS_IN_S);
 }
 
 void remove_prepared_statements(list_t *prepared_statements)

+ 23 - 20
frameworks/C/h2o/src/database.h

@@ -25,57 +25,60 @@
 #include <stdint.h>
 #include <postgresql/libpq-fe.h>
 
-#include "global_data.h"
 #include "list.h"
 
 #define DB_ERROR "database error\n"
 #define DB_REQ_ERROR "too many concurrent database requests\n"
 #define DB_TIMEOUT_ERROR "database timeout\n"
 #define IS_PREPARED 1
-#define IS_SINGLE_ROW 2
 
 typedef enum {
 	SUCCESS,
 	DONE,
-	WANT_WRITE
 } result_return_t;
 
-typedef struct thread_context_t thread_context_t;
+struct config_t;
+struct db_query_param_t;
 
-typedef struct db_query_param_t db_query_param_t;
+typedef result_return_t (*on_result_t)(struct db_query_param_t *, PGresult *);
 
-typedef result_return_t (*on_result_t)(db_query_param_t *, PGresult *);
-
-struct db_query_param_t {
+typedef struct db_query_param_t {
 	list_t l;
-	void (*on_error)(db_query_param_t *, const char *);
+	void (*on_error)(struct db_query_param_t *, const char *);
 	on_result_t on_result;
-	void (*on_timeout)(db_query_param_t *);
-	int (*on_write_ready)(db_query_param_t *, PGconn *);
+	void (*on_timeout)(struct db_query_param_t *);
 	const char *command;
 	const char * const *paramValues;
 	const int *paramLengths;
 	const int *paramFormats;
+	const Oid *paramTypes;
 	size_t nParams;
 	uint_fast32_t flags;
 	int resultFormat;
-};
+} db_query_param_t;
 
 typedef struct {
-	list_t *db_conn;
+	const struct config_t *config;
+	list_t *conn;
+	const char *conninfo;
+	h2o_loop_t *loop;
+	const list_t *prepared_statements;
 	// We use a FIFO queue instead of a simpler stack, otherwise the earlier queries may wait
 	// an unbounded amount of time to be executed.
 	queue_t queries;
-	size_t db_conn_num;
-	size_t free_db_conn_num;
+	size_t conn_num;
 	size_t query_num;
-	h2o_timeout_t h2o_timeout;
-} db_state_t;
+	h2o_timeout_t timeout;
+} db_conn_pool_t;
 
 void add_prepared_statement(const char *name, const char *query, list_t **prepared_statements);
-int execute_query(thread_context_t *ctx, db_query_param_t *param);
-void free_database_state(h2o_loop_t *loop, db_state_t *db_state);
-void initialize_database_state(h2o_loop_t *loop, db_state_t *db_state);
+int execute_database_query(db_conn_pool_t *pool, db_query_param_t *param);
+void free_database_connection_pool(db_conn_pool_t *pool);
+void initialize_database_connection_pool(const char *conninfo,
+                                         const struct config_t *config,
+                                         const list_t *prepared_statements,
+                                         h2o_loop_t *loop,
+                                         db_conn_pool_t *pool);
 void remove_prepared_statements(list_t *prepared_statements);
 
 #endif // DATABASE_H_

+ 5 - 4
frameworks/C/h2o/src/event_loop.c

@@ -62,7 +62,7 @@ static void accept_connection(h2o_socket_t *listener, const char *err)
 		                                                      listener->data);
 
 		if (!ctx->shutdown) {
-			size_t accepted = ctx->config->max_accept;
+			size_t accepted = ctx->global_thread_data->config->max_accept;
 
 			assert(accepted);
 
@@ -232,12 +232,13 @@ static void shutdown_server(h2o_socket_t *listener, const char *err)
 			ctx->event_loop.h2o_socket = NULL;
 		}
 
-		for (size_t i = ctx->config->thread_num - 1; i > 0; i--) {
+		for (size_t i = ctx->global_thread_data->config->thread_num - 1; i > 0; i--) {
 			message_t * const msg = h2o_mem_alloc(sizeof(*msg));
 
 			memset(msg, 0, sizeof(*msg));
 			msg->type = SHUTDOWN;
-			h2o_multithread_send_message(&ctx->global_thread_data[i].h2o_receiver, &msg->super);
+			h2o_multithread_send_message(&ctx->global_thread_data[i].h2o_receiver,
+			                             &msg->super);
 		}
 	}
 }
@@ -265,7 +266,7 @@ static void start_accept_polling(const config_t *config,
 	h2o_socket_read_start(h2o_socket, accept_cb);
 }
 
-void event_loop(thread_context_t *ctx)
+void event_loop(struct thread_context_t *ctx)
 {
 	while (!ctx->shutdown || ctx->event_loop.conn_num)
 		h2o_evloop_run(ctx->event_loop.h2o_ctx.loop, INT32_MAX);

+ 2 - 2
frameworks/C/h2o/src/event_loop.h

@@ -31,7 +31,7 @@ typedef enum {
 	SHUTDOWN
 } message_type_t;
 
-typedef struct thread_context_t thread_context_t;
+struct thread_context_t;
 
 typedef struct {
 	h2o_socket_t *h2o_https_socket;
@@ -46,7 +46,7 @@ typedef struct {
 	h2o_multithread_message_t super;
 } message_t;
 
-void event_loop(thread_context_t *ctx);
+void event_loop(struct thread_context_t *ctx);
 void free_event_loop(event_loop_t *event_loop, h2o_multithread_receiver_t *h2o_receiver);
 void initialize_event_loop(bool is_main_thread,
                            global_data_t *global_data,

+ 3 - 2
frameworks/C/h2o/src/global_data.h

@@ -32,7 +32,7 @@
 struct global_thread_data_t;
 struct thread_context_t;
 
-typedef struct {
+typedef struct config_t {
 	const char *bind_address;
 	const char *cert;
 	const char *db_host;
@@ -40,9 +40,11 @@ typedef struct {
 	const char *log;
 	const char *root;
 	const char *template_path;
+	size_t db_timeout;
 	size_t max_accept;
 	size_t max_db_conn_num;
 	size_t max_json_generator;
+	size_t max_pipeline_query_num;
 	size_t max_query_num;
 	size_t thread_num;
 	uint16_t https_port;
@@ -53,7 +55,6 @@ typedef struct {
 	h2o_logger_t *file_logger;
 	struct global_thread_data_t *global_thread_data;
 	list_t *postinitialization_tasks;
-	list_t *prepared_statements;
 	h2o_socket_t *signals;
 	SSL_CTX *ssl_ctx;
 	size_t memory_alignment;

+ 18 - 13
frameworks/C/h2o/src/handlers/fortune.c

@@ -214,7 +214,7 @@ static int fortunes(struct st_h2o_handler_t *self, h2o_req_t *req)
 	fortune_ctx->req = req;
 	fortune_ctx->result = &fortune->l;
 
-	if (execute_query(ctx, &fortune_ctx->param)) {
+	if (execute_database_query(&ctx->request_handler_data.hello_world_db, &fortune_ctx->param)) {
 		fortune_ctx->cleanup = true;
 		send_service_unavailable_error(DB_REQ_ERROR, req);
 	}
@@ -284,9 +284,10 @@ static result_return_t on_fortune_result(db_query_param_t *param, PGresult *resu
 		fortune_ctx->iovec_list_iter = iovec_list;
 		fortune_ctx->result = sort_list(fortune_ctx->result, compare_fortunes);
 
-		if (mustache_render(&api,
-		                    fortune_ctx,
-		                    ctx->global_data->request_handler_data.fortunes_template)) {
+		struct mustache_token_t * const fortunes_template =
+			ctx->global_thread_data->global_data->request_handler_data.fortunes_template;
+
+		if (mustache_render(&api, fortune_ctx, fortunes_template)) {
 			fortune_ctx->iovec_list = iovec_list->l.next;
 			set_default_response_param(HTML, fortune_ctx->content_length, fortune_ctx->req);
 			h2o_start_response(fortune_ctx->req, &fortune_ctx->generator);
@@ -422,25 +423,29 @@ static void template_error(mustache_api_t *api,
 	print_error(template_input->name, lineno, "mustache_compile", error);
 }
 
-void cleanup_fortunes_handler(global_data_t *global_data)
+void cleanup_fortunes_handler(request_handler_data_t *data)
 {
-	if (global_data->request_handler_data.fortunes_template) {
+	if (data->fortunes_template) {
 		mustache_api_t api = {.freedata = NULL};
 
-		mustache_free(&api, global_data->request_handler_data.fortunes_template);
+		mustache_free(&api, data->fortunes_template);
 	}
 }
 
 void initialize_fortunes_handler(const config_t *config,
-                                 global_data_t *global_data,
                                  h2o_hostconf_t *hostconf,
-                                 h2o_access_log_filehandle_t *log_handle)
+                                 h2o_access_log_filehandle_t *log_handle,
+                                 request_handler_data_t *data)
 {
 	mustache_template_t *template = NULL;
-	const size_t template_path_prefix_len = config->template_path ? strlen(config->template_path) : 0;
+	const size_t template_path_prefix_len = config->template_path ?
+	                                        strlen(config->template_path) :
+	                                        0;
 	char path[template_path_prefix_len + sizeof(TEMPLATE_PATH_SUFFIX)];
 
-	memcpy(path, config->template_path, template_path_prefix_len);
+	if (template_path_prefix_len)
+		memcpy(path, config->template_path, template_path_prefix_len);
+
 	memcpy(path + template_path_prefix_len, TEMPLATE_PATH_SUFFIX, sizeof(TEMPLATE_PATH_SUFFIX));
 
 	template_input_t template_input = {.input = fopen(path, "rb"), .name = path};
@@ -465,10 +470,10 @@ void initialize_fortunes_handler(const config_t *config,
 		STANDARD_ERROR("fopen");
 
 	if (template) {
-		global_data->request_handler_data.fortunes_template = template;
+		data->fortunes_template = template;
 		add_prepared_statement(FORTUNE_TABLE_NAME,
 		                       FORTUNE_QUERY,
-		                       &global_data->prepared_statements);
+		                       &data->prepared_statements);
 		register_request_handler("/fortunes", fortunes, hostconf, log_handle);
 	}
 }

+ 4 - 3
frameworks/C/h2o/src/handlers/fortune.h

@@ -24,11 +24,12 @@
 #include <h2o.h>
 
 #include "global_data.h"
+#include "request_handler_data.h"
 
-void cleanup_fortunes_handler(global_data_t *global_data);
+void cleanup_fortunes_handler(request_handler_data_t *data);
 void initialize_fortunes_handler(const config_t *config,
-                                 global_data_t *global_data,
                                  h2o_hostconf_t *hostconf,
-                                 h2o_access_log_filehandle_t *log_handle);
+                                 h2o_access_log_filehandle_t *log_handle,
+                                 request_handler_data_t *data);
 
 #endif // FORTUNE_H_

+ 7 - 4
frameworks/C/h2o/src/handlers/request_handler_data.h

@@ -21,18 +21,21 @@
 
 #define REQUEST_HANDLER_DATA_H_
 
-#include "cache.h"
+#include <h2o/cache.h>
+
+#include "database.h"
+#include "list.h"
 
 struct mustache_token_t;
 
 typedef struct {
 	struct mustache_token_t *fortunes_template;
-	cache_t world_cache;
+	list_t *prepared_statements;
 } request_handler_data_t;
 
 typedef struct {
-	// Replace with any actual fields; structures without members cause compiler warnings.
-	int pad;
+	h2o_cache_t *world_cache;
+	db_conn_pool_t hello_world_db;
 } request_handler_thread_data_t;
 
 #endif // REQUEST_HANDLER_DATA_H_

+ 110 - 89
frameworks/C/h2o/src/handlers/world.c

@@ -33,7 +33,6 @@
 #include <yajl/yajl_gen.h>
 
 #include "bitset.h"
-#include "cache.h"
 #include "database.h"
 #include "error.h"
 #include "global_data.h"
@@ -76,8 +75,10 @@ typedef struct multiple_query_ctx_t multiple_query_ctx_t;
 typedef struct update_ctx_t update_ctx_t;
 
 typedef struct {
-	thread_context_t *ctx;
+	request_handler_thread_data_t *data;
+	h2o_loop_t *loop;
 	db_query_param_t param;
+	h2o_iovec_t table;
 } populate_cache_ctx_t;
 
 typedef struct {
@@ -125,7 +126,9 @@ static int compare_items(const void *x, const void *y);
 static void complete_multiple_query(multiple_query_ctx_t *query_ctx);
 static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req);
 static void do_updates(multiple_query_ctx_t *query_ctx);
-static void fetch_from_cache(uint64_t now, cache_t *cache, multiple_query_ctx_t *query_ctx);
+static void fetch_from_cache(uint64_t now,
+                             request_handler_thread_data_t *data,
+                             multiple_query_ctx_t *query_ctx);
 static void free_cache_entry(h2o_iovec_t value);
 static size_t get_query_number(h2o_req_t *req);
 static void initialize_ids(size_t num_query, query_result_t *res, unsigned int *seed);
@@ -140,7 +143,6 @@ static void on_single_query_error(db_query_param_t *param, const char *error_str
 static result_return_t on_single_query_result(db_query_param_t *param, PGresult *result);
 static void on_single_query_timeout(db_query_param_t *param);
 static result_return_t on_update_result(db_query_param_t *param, PGresult *result);
-static void populate_cache(thread_context_t *ctx, void *arg);
 static void process_result(PGresult *result, size_t idx, query_result_t *out);
 static int serialize_item(uint32_t id, uint32_t random_number, yajl_gen gen);
 static void serialize_items(const query_result_t *res,
@@ -163,7 +165,7 @@ static void cleanup_multiple_query(multiple_query_ctx_t *query_ctx)
 		free_json_generator(query_ctx->gen,
 		                    &query_ctx->ctx->json_generator,
 		                    &query_ctx->ctx->json_generator_num,
-		                    query_ctx->ctx->config->max_json_generator);
+		                    query_ctx->ctx->global_thread_data->config->max_json_generator);
 
 	free(query_ctx);
 }
@@ -243,7 +245,9 @@ static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req)
 	base_size = ((base_size + _Alignof(query_param_t) - 1) / _Alignof(query_param_t));
 	base_size = base_size * _Alignof(query_param_t);
 
-	const size_t num_query_in_progress = MIN(num_query, ctx->config->max_db_conn_num);
+	const config_t * const config = ctx->global_thread_data->config;
+	const size_t num_query_in_progress =
+		MIN(num_query, config->max_db_conn_num * config->max_pipeline_query_num);
 	size_t sz = base_size + num_query_in_progress * sizeof(query_param_t);
 
 	if (do_update) {
@@ -273,7 +277,7 @@ static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req)
 	if (use_cache) {
 		query_ctx->flags |= USE_CACHE;
 		fetch_from_cache(h2o_now(ctx->event_loop.h2o_ctx.loop),
-		                 &ctx->global_data->request_handler_data.world_cache,
+		                 &ctx->request_handler_data,
 		                 query_ctx);
 
 		if (query_ctx->num_result == query_ctx->num_query) {
@@ -307,7 +311,8 @@ static int do_multiple_queries(bool do_update, bool use_cache, h2o_req_t *req)
 	}
 
 	for (size_t i = 0; i < query_ctx->num_query_in_progress; i++)
-		if (execute_query(ctx, &query_ctx->query_param[i].param)) {
+		if (execute_database_query(&ctx->request_handler_data.hello_world_db,
+		                           &query_ctx->query_param[i].param)) {
 			query_ctx->num_query_in_progress = i;
 			query_ctx->flags |= DO_CLEANUP;
 			send_service_unavailable_error(DB_REQ_ERROR, req);
@@ -366,7 +371,8 @@ static void do_updates(multiple_query_ctx_t *query_ctx)
 	if ((size_t) c >= sz)
 		goto error;
 
-	if (execute_query(query_ctx->ctx, &query_ctx->query_param->param)) {
+	if (execute_database_query(&query_ctx->ctx->request_handler_data.hello_world_db,
+	                           &query_ctx->query_param->param)) {
 		query_ctx->flags |= DO_CLEANUP;
 		send_service_unavailable_error(DB_REQ_ERROR, query_ctx->req);
 	}
@@ -380,22 +386,49 @@ error:
 	send_error(INTERNAL_SERVER_ERROR, REQ_ERROR, query_ctx->req);
 }
 
-static void fetch_from_cache(uint64_t now, cache_t *cache, multiple_query_ctx_t *query_ctx)
+static void fetch_from_cache(uint64_t now,
+                             request_handler_thread_data_t *data,
+                             multiple_query_ctx_t *query_ctx)
 {
-	h2o_iovec_t key = {.len = sizeof(query_ctx->res->id)};
+	if (data->world_cache) {
+		const h2o_iovec_t key = {.base = WORLD_TABLE_NAME, .len = sizeof(WORLD_TABLE_NAME) - 1};
+		h2o_cache_ref_t * const r = h2o_cache_fetch(data->world_cache, now, key, 0);
 
-	for (size_t i = 0; i < query_ctx->num_query; i++) {
-		key.base = (char *) &query_ctx->res[i].id;
+		if (r) {
+			const uint32_t * const table = (const uint32_t *) r->value.base;
 
-		const h2o_cache_hashcode_t keyhash = h2o_cache_calchash(key.base, key.len);
-		h2o_cache_ref_t * const r = cache_fetch(cache, now, key, keyhash);
+			for (size_t i = 0; i < query_ctx->num_query; i++) {
+				const uint32_t id = query_ctx->res[i].id;
 
-		if (r) {
-			query_ctx->res[i].id = query_ctx->res[query_ctx->num_result].id;
-			memcpy(query_ctx->res + query_ctx->num_result++,
-			       r->value.base,
-			       sizeof(*query_ctx->res));
-			cache_release(cache, r, keyhash);
+				memset(query_ctx->res + i, 0, sizeof(*query_ctx->res));
+				query_ctx->res[i].id = id;
+				assert(id <= MAX_ID);
+				query_ctx->res[i].random_number = table[id];
+			}
+
+			query_ctx->num_result = query_ctx->num_query;
+			h2o_cache_release(data->world_cache, r);
+		}
+	}
+	else {
+		data->world_cache = h2o_cache_create(0, CACHE_CAPACITY, CACHE_DURATION, free_cache_entry);
+
+		if (data->world_cache) {
+			populate_cache_ctx_t * const ctx = h2o_mem_alloc(sizeof(*ctx));
+
+			memset(ctx, 0, sizeof(*ctx));
+			ctx->data = data;
+			ctx->loop = query_ctx->ctx->event_loop.h2o_ctx.loop;
+			ctx->table.len = (MAX_ID + 1) * sizeof(uint32_t);
+			ctx->table.base = h2o_mem_alloc(ctx->table.len);
+			memset(ctx->table.base, 0, ctx->table.len);
+			ctx->param.command = POPULATE_CACHE_QUERY;
+			ctx->param.on_error = on_populate_cache_error;
+			ctx->param.on_result = on_populate_cache_result;
+			ctx->param.on_timeout = on_populate_cache_timeout;
+
+			if (execute_database_query(&data->hello_world_db, &ctx->param))
+				on_populate_cache_error(&ctx->param, NULL);
 		}
 	}
 }
@@ -482,20 +515,6 @@ static result_return_t on_multiple_query_result(db_query_param_t *param, PGresul
 	else if (PQresultStatus(result) == PGRES_TUPLES_OK) {
 		assert(PQntuples(result) == 1);
 		process_result(result, 0, query_ctx->res + query_ctx->num_result);
-
-		if (query_ctx->flags & USE_CACHE) {
-			query_result_t * const r = h2o_mem_alloc(sizeof(*r));
-			const h2o_iovec_t key = {.base = (char *) &r->id, .len = sizeof(r->id)};
-			const h2o_iovec_t value = {.base = (char *) r, .len = sizeof(*r)};
-
-			*r = query_ctx->res[query_ctx->num_result];
-			cache_set(h2o_now(query_ctx->ctx->event_loop.h2o_ctx.loop),
-			          key,
-			          0,
-			          value,
-			          &query_ctx->ctx->global_data->request_handler_data.world_cache);
-		}
-
 		query_ctx->num_result++;
 
 		const size_t num_query_remaining = query_ctx->num_query - query_ctx->num_result;
@@ -505,7 +524,8 @@ static result_return_t on_multiple_query_result(db_query_param_t *param, PGresul
 
 			query_param->id = htonl(query_ctx->res[idx].id);
 
-			if (execute_query(query_ctx->ctx, &query_param->param)) {
+			if (execute_database_query(&query_ctx->ctx->request_handler_data.hello_world_db,
+			                           &query_param->param)) {
 				query_ctx->flags |= DO_CLEANUP;
 				send_service_unavailable_error(DB_REQ_ERROR, query_ctx->req);
 			}
@@ -545,45 +565,53 @@ static void on_multiple_query_timeout(db_query_param_t *param)
 static void on_populate_cache_error(db_query_param_t *param, const char *error_string)
 {
 	IGNORE_FUNCTION_PARAMETER(error_string);
-	free(H2O_STRUCT_FROM_MEMBER(populate_cache_ctx_t, param, param));
-}
 
-static result_return_t on_populate_cache_result(db_query_param_t *param, PGresult *result)
-{
 	populate_cache_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(populate_cache_ctx_t,
 	                                                                param,
 	                                                                param);
 
+	h2o_cache_destroy(query_ctx->data->world_cache);
+	query_ctx->data->world_cache = NULL;
+	free(query_ctx->table.base);
+	free(query_ctx);
+}
+
+static result_return_t on_populate_cache_result(db_query_param_t *param, PGresult *result)
+{
 	if (PQresultStatus(result) == PGRES_TUPLES_OK) {
 		const size_t num_rows = PQntuples(result);
+		populate_cache_ctx_t * const query_ctx = H2O_STRUCT_FROM_MEMBER(populate_cache_ctx_t,
+		                                                                param,
+		                                                                param);
+		query_result_t r = {.id = 0};
+		uint32_t * const table = (uint32_t *) query_ctx->table.base;
 
 		for (size_t i = 0; i < num_rows; i++) {
-			query_result_t * const r = h2o_mem_alloc(sizeof(*r));
-
-			memset(r, 0, sizeof(*r));
-			process_result(result, i, r);
+			process_result(result, i, &r);
+			table[r.id] = r.random_number;
+		}
 
-			const h2o_iovec_t key = {.base = (char *) &r->id, .len = sizeof(r->id)};
-			const h2o_iovec_t value = {.base = (char *) r, .len = sizeof(*r)};
+		const h2o_iovec_t key = {.base = WORLD_TABLE_NAME, .len = sizeof(WORLD_TABLE_NAME) - 1};
 
-			cache_set(h2o_now(query_ctx->ctx->event_loop.h2o_ctx.loop),
-			          key,
-			          0,
-			          value,
-			          &query_ctx->ctx->global_data->request_handler_data.world_cache);
-		}
+		h2o_cache_set(query_ctx->data->world_cache,
+		              h2o_now(query_ctx->loop),
+		              key,
+		              0,
+		              query_ctx->table);
+		free(query_ctx);
 	}
-	else
+	else {
 		LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
+		on_populate_cache_error(param, NULL);
+	}
 
 	PQclear(result);
-	free(query_ctx);
 	return DONE;
 }
 
 static void on_populate_cache_timeout(db_query_param_t *param)
 {
-	free(H2O_STRUCT_FROM_MEMBER(populate_cache_ctx_t, param, param));
+	on_populate_cache_error(param, NULL);
 }
 
 static void on_single_query_error(db_query_param_t *param, const char *error_string)
@@ -690,23 +718,6 @@ static result_return_t on_update_result(db_query_param_t *param, PGresult *resul
 	return DONE;
 }
 
-static void populate_cache(thread_context_t *ctx, void *arg)
-{
-	IGNORE_FUNCTION_PARAMETER(arg);
-
-	populate_cache_ctx_t * const query_ctx = h2o_mem_alloc(sizeof(*query_ctx));
-
-	memset(query_ctx, 0, sizeof(*query_ctx));
-	query_ctx->ctx = ctx;
-	query_ctx->param.command = POPULATE_CACHE_QUERY;
-	query_ctx->param.on_error = on_populate_cache_error;
-	query_ctx->param.on_result = on_populate_cache_result;
-	query_ctx->param.on_timeout = on_populate_cache_timeout;
-
-	if (execute_query(ctx, &query_ctx->param))
-		free(query_ctx);
-}
-
 static void process_result(PGresult *result, size_t idx, query_result_t *out)
 {
 	assert(PQnfields(result) == 2);
@@ -811,7 +822,7 @@ static int single_query(struct st_h2o_handler_t *self, h2o_req_t *req)
 	query_ctx->param.resultFormat = 1;
 	query_ctx->req = req;
 
-	if (execute_query(ctx, &query_ctx->param)) {
+	if (execute_database_query(&ctx->request_handler_data.hello_world_db, &query_ctx->param)) {
 		query_ctx->cleanup = true;
 		send_service_unavailable_error(DB_REQ_ERROR, req);
 	}
@@ -825,27 +836,37 @@ static int updates(struct st_h2o_handler_t *self, h2o_req_t *req)
 	return do_multiple_queries(true, false, req);
 }
 
-void cleanup_world_handlers(global_data_t *global_data)
+void cleanup_world_handler_thread_data(request_handler_thread_data_t *data)
+{
+	free_database_connection_pool(&data->hello_world_db);
+
+	if (data->world_cache)
+		h2o_cache_destroy(data->world_cache);
+}
+
+void cleanup_world_handlers(request_handler_data_t *data)
 {
-	cache_destroy(&global_data->request_handler_data.world_cache);
+	remove_prepared_statements(data->prepared_statements);
 }
 
-void initialize_world_handlers(const config_t *config,
-                               global_data_t *global_data,
-                               h2o_hostconf_t *hostconf,
-                               h2o_access_log_filehandle_t *log_handle)
+void initialize_world_handler_thread_data(thread_context_t *ctx,
+                                          const request_handler_data_t *data,
+                                          request_handler_thread_data_t *thread_data)
 {
-	add_prepared_statement(WORLD_TABLE_NAME, WORLD_QUERY, &global_data->prepared_statements);
+	initialize_database_connection_pool(ctx->global_thread_data->config->db_host,
+	                                    ctx->global_thread_data->config,
+	                                    data->prepared_statements,
+	                                    ctx->event_loop.h2o_ctx.loop,
+	                                    &thread_data->hello_world_db);
+}
+
+void initialize_world_handlers(h2o_hostconf_t *hostconf,
+                               h2o_access_log_filehandle_t *log_handle,
+                               request_handler_data_t *data)
+{
+	add_prepared_statement(WORLD_TABLE_NAME, WORLD_QUERY, &data->prepared_statements);
+	register_request_handler("/cached-worlds", cached_queries, hostconf, log_handle);
 	register_request_handler("/db", single_query, hostconf, log_handle);
 	register_request_handler("/queries", multiple_queries, hostconf, log_handle);
 	register_request_handler("/updates", updates, hostconf, log_handle);
-
-	if (!cache_create(config->thread_num,
-	                  CACHE_CAPACITY,
-	                  CACHE_DURATION,
-	                  free_cache_entry,
-	                  &global_data->request_handler_data.world_cache)) {
-		add_postinitialization_task(populate_cache, NULL, &global_data->postinitialization_tasks);
-		register_request_handler("/cached-worlds", cached_queries, hostconf, log_handle);
-	}
 }

+ 12 - 6
frameworks/C/h2o/src/handlers/world.h

@@ -24,11 +24,17 @@
 #include <h2o.h>
 
 #include "global_data.h"
-
-void cleanup_world_handlers(global_data_t *global_data);
-void initialize_world_handlers(const config_t *config,
-                               global_data_t *global_data,
-                               h2o_hostconf_t *hostconf,
-                               h2o_access_log_filehandle_t *log_handle);
+#include "list.h"
+#include "request_handler_data.h"
+#include "thread.h"
+
+void cleanup_world_handler_thread_data(request_handler_thread_data_t *data);
+void cleanup_world_handlers(request_handler_data_t *data);
+void initialize_world_handler_thread_data(thread_context_t *ctx,
+                                          const request_handler_data_t *data,
+                                          request_handler_thread_data_t *thread_data);
+void initialize_world_handlers(h2o_hostconf_t *hostconf,
+                               h2o_access_log_filehandle_t *log_handle,
+                               request_handler_data_t *data);
 
 #endif // WORLD_H_

+ 39 - 13
frameworks/C/h2o/src/main.c

@@ -41,12 +41,23 @@
 #include "utility.h"
 
 #define USAGE_MESSAGE \
-	"Usage:\n%s [-a <max connections accepted simultaneously>] [-b <bind address>] " \
-	"[-c <certificate file>] [-d <database connection string>] [-f template file path] " \
-	"[-j <max reused JSON generators>] [-k <private key file>] [-l <log path>] " \
-	"[-m <max database connections per thread>] [-p <port>] " \
-	"[-q <max enqueued database queries per thread>] [-r <root directory>] " \
-	"[-s <HTTPS port>] [-t <thread number>]\n"
+	"Usage:\n%s " \
+	"[-a <max connections accepted simultaneously>] " \
+	"[-b <bind address>] " \
+	"[-c <certificate file>] " \
+	"[-d <database connection string>] " \
+	"[-e <max pipelined database queries>] " \
+	"[-f <template file path>] " \
+	"[-j <max reused JSON generators>] " \
+	"[-k <private key file>] " \
+	"[-l <log path>] " \
+	"[-m <max database connections per thread>] " \
+	"[-o <database query timeout in seconds>] " \
+	"[-p <port>] " \
+	"[-q <max enqueued database queries per thread>] " \
+	"[-r <root directory>] " \
+	"[-s <HTTPS port>] " \
+	"[-t <thread number>]\n"
 
 typedef struct {
 	list_t l;
@@ -73,8 +84,7 @@ static void free_global_data(global_data_t *global_data)
 	if (global_data->file_logger)
 		global_data->file_logger->dispose(global_data->file_logger);
 
-	cleanup_request_handlers(global_data);
-	remove_prepared_statements(global_data->prepared_statements);
+	cleanup_request_handlers(&global_data->request_handler_data);
 	h2o_config_dispose(&global_data->h2o_config);
 
 	if (global_data->ssl_ctx)
@@ -111,7 +121,11 @@ static int initialize_global_data(const config_t *config, global_data_t *global_
 			goto error;
 	}
 
-	initialize_request_handlers(config, global_data, hostconf, log_handle);
+	initialize_request_handlers(config,
+	                            hostconf,
+	                            log_handle,
+	                            &global_data->postinitialization_tasks,
+	                            &global_data->request_handler_data);
 
 	// Must be registered after the rest of the request handlers.
 	if (config->root) {
@@ -145,7 +159,7 @@ static int parse_options(int argc, char *argv[], config_t *config)
 	opterr = 0;
 
 	while (1) {
-		const int opt = getopt(argc, argv, "?a:b:c:d:f:j:k:l:m:p:q:r:s:t:");
+		const int opt = getopt(argc, argv, "?a:b:c:d:e:f:j:k:l:m:o:p:q:r:s:t:");
 
 		if (opt == -1)
 			break;
@@ -178,6 +192,9 @@ static int parse_options(int argc, char *argv[], config_t *config)
 			case 'd':
 				config->db_host = optarg;
 				break;
+			case 'e':
+				PARSE_NUMBER(config->max_pipeline_query_num);
+				break;
 			case 'f':
 				config->template_path = optarg;
 				break;
@@ -193,6 +210,9 @@ static int parse_options(int argc, char *argv[], config_t *config)
 			case 'm':
 				PARSE_NUMBER(config->max_db_conn_num);
 				break;
+			case 'o':
+				PARSE_NUMBER(config->db_timeout);
+				break;
 			case 'p':
 				PARSE_NUMBER(config->port);
 				break;
@@ -234,12 +254,21 @@ static void run_postinitialization_tasks(list_t **tasks, thread_context_t *ctx)
 
 static void set_default_options(config_t *config)
 {
+	if (!config->db_timeout)
+		config->db_timeout = 10;
+
+	if (!config->https_port)
+		config->https_port = 4443;
+
 	if (!config->max_accept)
 		config->max_accept = 10;
 
 	if (!config->max_db_conn_num)
 		config->max_db_conn_num = 10;
 
+	if (!config->max_pipeline_query_num)
+		config->max_pipeline_query_num = 16;
+
 	if (!config->max_query_num)
 		config->max_query_num = 10000;
 
@@ -248,9 +277,6 @@ static void set_default_options(config_t *config)
 
 	if (!config->thread_num)
 		config->thread_num = h2o_numproc();
-
-	if (!config->https_port)
-		config->https_port = 4443;
 }
 
 static void setup_process(void)

+ 18 - 14
frameworks/C/h2o/src/request_handler.c

@@ -29,6 +29,7 @@
 #include "handlers/fortune.h"
 #include "handlers/json_serializer.h"
 #include "handlers/plaintext.h"
+#include "handlers/request_handler_data.h"
 #include "handlers/world.h"
 
 static const char *status_code_to_string(http_status_code_t status_code)
@@ -58,15 +59,15 @@ static const char *status_code_to_string(http_status_code_t status_code)
 	return ret;
 }
 
-void cleanup_request_handlers(global_data_t *global_data)
+void cleanup_request_handler_thread_data(request_handler_thread_data_t *data)
 {
-	cleanup_fortunes_handler(global_data);
-	cleanup_world_handlers(global_data);
+	cleanup_world_handler_thread_data(data);
 }
 
-void free_request_handler_thread_data(request_handler_thread_data_t *request_handler_thread_data)
+void cleanup_request_handlers(request_handler_data_t *data)
 {
-	IGNORE_FUNCTION_PARAMETER(request_handler_thread_data);
+	cleanup_fortunes_handler(data);
+	cleanup_world_handlers(data);
 }
 
 const char *get_query_param(const char *query,
@@ -94,22 +95,25 @@ const char *get_query_param(const char *query,
 	return ret;
 }
 
-void initialize_request_handler_thread_data(
-		const config_t *config, request_handler_thread_data_t *request_handler_thread_data)
+void initialize_request_handler_thread_data(thread_context_t *ctx)
 {
-	IGNORE_FUNCTION_PARAMETER(config);
-	IGNORE_FUNCTION_PARAMETER(request_handler_thread_data);
+	const request_handler_data_t * const data =
+		&ctx->global_thread_data->global_data->request_handler_data;
+
+	initialize_world_handler_thread_data(ctx, data, &ctx->request_handler_data);
 }
 
 void initialize_request_handlers(const config_t *config,
-                                 global_data_t *global_data,
                                  h2o_hostconf_t *hostconf,
-                                 h2o_access_log_filehandle_t *log_handle)
+                                 h2o_access_log_filehandle_t *log_handle,
+                                 list_t **postinitialization_tasks,
+                                 request_handler_data_t *data)
 {
-	initialize_fortunes_handler(config, global_data, hostconf, log_handle);
+	IGNORE_FUNCTION_PARAMETER(postinitialization_tasks);
+	initialize_fortunes_handler(config, hostconf, log_handle, data);
 	initialize_json_serializer_handler(hostconf, log_handle);
 	initialize_plaintext_handler(hostconf, log_handle);
-	initialize_world_handlers(config, global_data, hostconf, log_handle);
+	initialize_world_handlers(hostconf, log_handle, data);
 }
 
 void register_request_handler(const char *path,
@@ -151,7 +155,7 @@ int send_json_response(json_generator_t *gen, bool free_gen, h2o_req_t *req)
 			free_json_generator(gen,
 			                    &ctx->json_generator,
 			                    &ctx->json_generator_num,
-			                    ctx->config->max_json_generator);
+			                    ctx->global_thread_data->config->max_json_generator);
 		}
 		else {
 			h2o_generator_t generator;

+ 9 - 6
frameworks/C/h2o/src/request_handler.h

@@ -25,7 +25,10 @@
 #include <stdbool.h>
 
 #include "global_data.h"
+#include "list.h"
+#include "thread.h"
 #include "utility.h"
+#include "handlers/request_handler_data.h"
 
 #define REQ_ERROR "request error\n"
 
@@ -43,18 +46,18 @@ typedef enum {
 	GATEWAY_TIMEOUT = 504
 } http_status_code_t;
 
-void cleanup_request_handlers(global_data_t *global_data);
-void free_request_handler_thread_data(request_handler_thread_data_t *request_handler_thread_data);
+void cleanup_request_handler_thread_data(request_handler_thread_data_t *data);
+void cleanup_request_handlers(request_handler_data_t *data);
 const char *get_query_param(const char *query,
                             size_t query_len,
                             const char *param,
                             size_t param_len);
-void initialize_request_handler_thread_data(
-		const config_t *config, request_handler_thread_data_t *request_handler_thread_data);
+void initialize_request_handler_thread_data(thread_context_t *ctx);
 void initialize_request_handlers(const config_t *config,
-                                 global_data_t *global_data,
                                  h2o_hostconf_t *hostconf,
-                                 h2o_access_log_filehandle_t *log_handle);
+                                 h2o_access_log_filehandle_t *log_handle,
+                                 list_t **postinitialization_tasks,
+                                 request_handler_data_t *data);
 void register_request_handler(const char *path,
                               int (*handler)(struct st_h2o_handler_t *, h2o_req_t *),
                               h2o_hostconf_t *hostconf,

+ 3 - 8
frameworks/C/h2o/src/thread.c

@@ -31,7 +31,6 @@
 #include <h2o/serverutil.h>
 #include <sys/syscall.h>
 
-#include "database.h"
 #include "error.h"
 #include "event_loop.h"
 #include "global_data.h"
@@ -46,7 +45,7 @@ static void *run_thread(void *arg)
 	thread_context_t ctx;
 
 	initialize_thread_context(arg, false, &ctx);
-	set_thread_memory_allocation_policy(ctx.config->thread_num);
+	set_thread_memory_allocation_policy(ctx.global_thread_data->config->thread_num);
 	event_loop(&ctx);
 	free_thread_context(&ctx);
 	pthread_exit(NULL);
@@ -87,9 +86,8 @@ static void set_thread_memory_allocation_policy(size_t thread_num)
 
 void free_thread_context(thread_context_t *ctx)
 {
-	free_database_state(ctx->event_loop.h2o_ctx.loop, &ctx->db_state);
+	cleanup_request_handler_thread_data(&ctx->request_handler_data);
 	free_event_loop(&ctx->event_loop, &ctx->global_thread_data->h2o_receiver);
-	free_request_handler_thread_data(&ctx->request_handler_data);
 
 	if (ctx->json_generator)
 		do {
@@ -129,16 +127,13 @@ void initialize_thread_context(global_thread_data_t *global_thread_data,
                                thread_context_t *ctx)
 {
 	memset(ctx, 0, sizeof(*ctx));
-	ctx->config = global_thread_data->config;
-	ctx->global_data = global_thread_data->global_data;
 	ctx->global_thread_data = global_thread_data;
 	ctx->random_seed = syscall(SYS_gettid);
 	initialize_event_loop(is_main_thread,
 	                      global_thread_data->global_data,
 	                      &global_thread_data->h2o_receiver,
 	                      &ctx->event_loop);
-	initialize_database_state(ctx->event_loop.h2o_ctx.loop, &ctx->db_state);
-	initialize_request_handler_thread_data(ctx->config, &ctx->request_handler_data);
+	initialize_request_handler_thread_data(ctx);
 	global_thread_data->ctx = ctx;
 }
 

+ 4 - 10
frameworks/C/h2o/src/thread.h

@@ -25,36 +25,30 @@
 #include <pthread.h>
 #include <stdbool.h>
 
-#include "database.h"
 #include "event_loop.h"
 #include "global_data.h"
 #include "list.h"
 #include "handlers/request_handler_data.h"
 
-typedef struct thread_context_t thread_context_t;
+struct thread_context_t;
 
 typedef struct global_thread_data_t {
 	const config_t *config;
-	thread_context_t *ctx;
+	struct thread_context_t *ctx;
 	global_data_t *global_data;
 	h2o_multithread_receiver_t h2o_receiver;
 	pthread_t thread;
 } global_thread_data_t;
 
-struct thread_context_t {
-	const config_t *config;
-	global_data_t *global_data;
-	// global_thread_data contains config and global_data as well,
-	// but keep copies here to avoid some pointer chasing.
+typedef struct thread_context_t {
 	global_thread_data_t *global_thread_data;
 	list_t *json_generator;
 	size_t json_generator_num;
 	unsigned random_seed;
 	bool shutdown;
-	db_state_t db_state;
 	event_loop_t event_loop;
 	request_handler_thread_data_t request_handler_data;
-};
+} thread_context_t;
 
 void free_thread_context(thread_context_t *ctx);
 global_thread_data_t *initialize_global_thread_data(const config_t *config,

+ 1 - 1
frameworks/C/h2o/src/utility.c

@@ -32,7 +32,7 @@
 #include "list.h"
 #include "utility.h"
 
-#define DEFAULT_CACHE_LINE_SIZE 128
+#define DEFAULT_CACHE_LINE_SIZE 256
 
 static list_t *get_sorted_sublist(list_t *head, int (*compare)(const list_t *, const list_t *));
 static list_t *merge_lists(list_t *head1,