Selaa lähdekoodia

H2O: Improve database query throughput (#8253)

Improve the database query throughput at the cost of a slight
increase in the minimum achievable latency (e.g. if there is only
one query in progress at any given time), assuming that the
communication latency between the application server and the
database is much larger than the time it takes to perform a system
call.
Anton Kirilov 2 vuotta sitten
vanhempi
commit
2c50df3b09

+ 90 - 89
frameworks/C/h2o/src/database.c

@@ -30,6 +30,7 @@
 
 #include "database.h"
 #include "error.h"
+#include "event_loop.h"
 #include "global_data.h"
 #include "list.h"
 
@@ -37,9 +38,8 @@
 
 // Database connection state
 #define EXPECT_SYNC 1
-#define IDLE 2
-#define IGNORE_RESULT 4
-#define RESET 8
+#define IGNORE_RESULT 2
+#define RESET 4
 
 typedef struct {
 	list_t l;
@@ -61,8 +61,9 @@ typedef struct {
 } prepared_statement_t;
 
 static h2o_socket_t *create_socket(int sd, h2o_loop_t *loop);
-static int do_execute_query(db_conn_t *conn, db_query_param_t *param, bool flush);
+static int do_execute_query(db_conn_t *conn, db_query_param_t *param);
 static void error_notification(db_conn_pool_t *pool, bool timeout, const char *error_string);
+static int flush_connection(h2o_socket_cb cb, db_conn_t *conn);
 static void on_database_connect_error(db_conn_t *conn, bool timeout, const char *error_string);
 static void on_database_connect_read_ready(h2o_socket_t *sock, const char *err);
 static void on_database_connect_timeout(h2o_timer_t *timer);
@@ -71,9 +72,11 @@ static void on_database_error(db_conn_t *conn, const char *error_string);
 static void on_database_read_ready(h2o_socket_t *sock, const char *err);
 static void on_database_timeout(h2o_timer_t *timer);
 static void on_database_write_ready(h2o_socket_t *sock, const char *err);
+static void on_process_queries(void *arg);
 static void poll_database_connection(h2o_socket_t *sock, const char *err);
 static void prepare_statements(db_conn_t *conn);
 static void process_queries(db_conn_t *conn);
+static void remove_connection(db_conn_t *conn);
 static void start_database_connect(db_conn_pool_t *pool, db_conn_t *conn);
 
 static h2o_socket_t *create_socket(int sd, h2o_loop_t *loop)
@@ -104,7 +107,7 @@ static h2o_socket_t *create_socket(int sd, h2o_loop_t *loop)
 	return ret;
 }
 
-static int do_execute_query(db_conn_t *conn, db_query_param_t *param, bool flush)
+static int do_execute_query(db_conn_t *conn, db_query_param_t *param)
 {
 	assert(conn->query_num);
 	assert((conn->queries.head && conn->query_num < conn->pool->config->max_pipeline_query_num) ||
@@ -137,17 +140,6 @@ static int do_execute_query(db_conn_t *conn, db_query_param_t *param, bool flush
 		return 1;
 	}
 
-	if (flush) {
-		const int send_status = PQflush(conn->conn);
-
-		if (send_status < 0) {
-			LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
-			return 1;
-		}
-		else if (send_status)
-			h2o_socket_notify_write(conn->sock, on_database_write_ready);
-	}
-
 	if (!conn->queries.head && !(conn->flags & (EXPECT_SYNC | IGNORE_RESULT))) {
 		assert(!h2o_timer_is_linked(&conn->timer));
 		conn->timer.cb = on_database_timeout;
@@ -158,7 +150,6 @@ static int do_execute_query(db_conn_t *conn, db_query_param_t *param, bool flush
 	*conn->queries.tail = &param->l;
 	conn->queries.tail = &param->l.next;
 	conn->query_num--;
-	conn->flags &= ~IDLE;
 	return 0;
 }
 
@@ -189,6 +180,18 @@ static void error_notification(db_conn_pool_t *pool, bool timeout, const char *e
 	}
 }
 
+static int flush_connection(h2o_socket_cb cb, db_conn_t *conn)
+{
+	const int send_status = PQflush(conn->conn);
+
+	if (send_status < 0)
+		LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
+	else if (send_status)
+		h2o_socket_notify_write(conn->sock, cb);
+
+	return send_status < 0;
+}
+
 static void on_database_connect_error(db_conn_t *conn, bool timeout, const char *error_string)
 {
 	db_conn_pool_t * const pool = conn->pool;
@@ -217,17 +220,10 @@ static void on_database_connect_read_ready(h2o_socket_t *sock, const char *err)
 		return;
 	}
 
-	const int send_status = PQflush(conn->conn);
-
-	if (send_status < 0) {
-		LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
+	if (flush_connection(on_database_connect_write_ready, conn)) {
 		on_database_connect_error(conn, false, DB_ERROR);
 		return;
 	}
-	else if (send_status) {
-		h2o_socket_notify_write(conn->sock, on_database_connect_write_ready);
-		return;
-	}
 
 	while (!PQisBusy(conn->conn)) {
 		PGresult * const result = PQgetResult(conn->conn);
@@ -271,16 +267,8 @@ static void on_database_connect_write_ready(h2o_socket_t *sock, const char *err)
 		ERROR(err);
 		on_database_connect_error(conn, false, err);
 	}
-	else {
-		const int send_status = PQflush(conn->conn);
-
-		if (send_status < 0) {
-			LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
-			on_database_connect_error(conn, false, DB_ERROR);
-		}
-		else if (send_status)
-			h2o_socket_notify_write(conn->sock, on_database_connect_write_ready);
-	}
+	else if (flush_connection(on_database_connect_write_ready, conn))
+		on_database_connect_error(conn, false, DB_ERROR);
 }
 
 static void on_database_error(db_conn_t *conn, const char *error_string)
@@ -303,6 +291,9 @@ static void on_database_read_ready(h2o_socket_t *sock, const char *err)
 {
 	db_conn_t * const conn = sock->data;
 
+	// This call should not be problematic, assuming a relatively low number of connections.
+	remove_connection(conn);
+
 	if (err) {
 		ERROR(err);
 		on_database_error(conn, err);
@@ -315,17 +306,10 @@ static void on_database_read_ready(h2o_socket_t *sock, const char *err)
 		return;
 	}
 
-	const int send_status = PQflush(conn->conn);
-
-	if (send_status < 0) {
-		LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
+	if (flush_connection(on_database_write_ready, conn)) {
 		on_database_error(conn, DB_ERROR);
 		return;
 	}
-	else if (send_status) {
-		h2o_socket_notify_write(conn->sock, on_database_write_ready);
-		return;
-	}
 
 	while (!PQisBusy(conn->conn)) {
 		PGresult * const result = PQgetResult(conn->conn);
@@ -392,6 +376,7 @@ static void on_database_timeout(h2o_timer_t *timer)
 {
 	db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, timer, timer);
 
+	remove_connection(conn);
 	ERROR(DB_TIMEOUT_ERROR);
 
 	if (conn->queries.head) {
@@ -412,20 +397,31 @@ static void on_database_write_ready(h2o_socket_t *sock, const char *err)
 
 	if (err) {
 		ERROR(err);
+		remove_connection(conn);
 		on_database_error(conn, err);
 	}
-	else {
-		const int send_status = PQflush(conn->conn);
+	else if (flush_connection(on_database_write_ready, conn)) {
+		remove_connection(conn);
+		on_database_error(conn, DB_ERROR);
+	}
+}
 
-		if (send_status < 0) {
-			LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
-			on_database_error(conn, DB_ERROR);
-		}
-		else if (send_status)
-			h2o_socket_notify_write(conn->sock, on_database_write_ready);
-		else
-			process_queries(conn);
+static void on_process_queries(void *arg)
+{
+	db_conn_pool_t * const pool = arg;
+
+	while (pool->queries.head && pool->conn) {
+		db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, pool->conn);
+
+		pool->conn = conn->l.next;
+		assert(conn->query_num);
+		process_queries(conn);
 	}
+
+	if (pool->queries.head && pool->conn_num)
+		start_database_connect(pool, NULL);
+
+	pool->process_queries = false;
 }
 
 static void poll_database_connection(h2o_socket_t *sock, const char *err)
@@ -563,7 +559,7 @@ static void process_queries(db_conn_t *conn)
 
 		conn->pool->queries.head = param->l.next;
 
-		if (do_execute_query(conn, param, false)) {
+		if (do_execute_query(conn, param)) {
 			param->on_error(param, DB_ERROR);
 
 			if (PQstatus(conn->conn) != CONNECTION_OK) {
@@ -573,25 +569,26 @@ static void process_queries(db_conn_t *conn)
 		}
 	}
 
-	if (flush) {
-		const int send_status = PQflush(conn->conn);
-
-		if (send_status < 0) {
-			LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
-			on_database_error(conn, DB_ERROR);
-			return;
-		}
-		else if (send_status)
-			h2o_socket_notify_write(conn->sock, on_database_write_ready);
-	}
-
-	if (!conn->queries.head && !(conn->flags & (EXPECT_SYNC | IDLE | IGNORE_RESULT))) {
+	if (flush && flush_connection(on_database_write_ready, conn))
+		on_database_error(conn, DB_ERROR);
+	else if (conn->query_num) {
 		conn->l.next = conn->pool->conn;
 		conn->pool->conn = &conn->l;
-		conn->flags |= IDLE;
 	}
 }
 
+static void remove_connection(db_conn_t *conn)
+{
+	list_t *iter = conn->pool->conn;
+	list_t **prev = &conn->pool->conn;
+
+	for (; iter && iter != &conn->l; iter = iter->next)
+		prev = &iter->next;
+
+	if (iter)
+		*prev = iter->next;
+}
+
 static void start_database_connect(db_conn_pool_t *pool, db_conn_t *conn)
 {
 	if (conn) {
@@ -666,34 +663,37 @@ int execute_database_query(db_conn_pool_t *pool, db_query_param_t *param)
 {
 	int ret = 1;
 
-	if (pool->conn) {
-		db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, pool->conn);
+	if (pool->query_num) {
+		if (pool->conn) {
+			// Delay sending the database queries to the server, so that if there is a rapid
+			// succession of calls to this function, all resultant queries would be inserted
+			// into a command pipeline with a single system call.
+			if (!pool->process_queries) {
+				task_message_t * const msg = h2o_mem_alloc(sizeof(*msg));
+
+				memset(msg, 0, sizeof(*msg));
+				msg->arg = pool;
+				msg->super.type = TASK;
+				msg->task = on_process_queries;
+				send_message(&msg->super, pool->receiver);
+				pool->process_queries = true;
+			}
 
-		assert(!conn->queries.head);
-		assert(conn->flags & IDLE);
-		assert(!(conn->flags & (EXPECT_SYNC | IGNORE_RESULT)));
-		pool->conn = conn->l.next;
-		ret = do_execute_query(conn, param, true);
+			ret = 0;
+		}
+		else {
+			if (pool->conn_num)
+				start_database_connect(pool, NULL);
 
-		if (ret) {
-			if (PQstatus(conn->conn) == CONNECTION_OK) {
-				conn->l.next = pool->conn;
-				pool->conn = &conn->l;
-			}
-			else
-				start_database_connect(conn->pool, conn);
+			if (pool->conn_num < pool->config->max_db_conn_num && pool->query_num)
+				ret = 0;
 		}
-	}
-	else if (pool->query_num) {
-		if (pool->conn_num)
-			start_database_connect(pool, NULL);
 
-		if (pool->conn_num < pool->config->max_db_conn_num && pool->query_num) {
+		if (!ret) {
 			param->l.next = NULL;
 			*pool->queries.tail = &param->l;
 			pool->queries.tail = &param->l.next;
 			pool->query_num--;
-			ret = 0;
 		}
 	}
 
@@ -713,7 +713,6 @@ void free_database_connection_pool(db_conn_pool_t *pool)
 
 			assert(!conn->queries.head);
 			assert(conn->query_num == pool->config->max_pipeline_query_num);
-			assert(conn->flags & IDLE);
 			assert(!(conn->flags & (EXPECT_SYNC | IGNORE_RESULT | RESET)));
 			assert(!h2o_timer_is_linked(&conn->timer));
 			h2o_socket_read_stop(conn->sock);
@@ -731,6 +730,7 @@ void initialize_database_connection_pool(const char *conninfo,
                                          const struct config_t *config,
                                          const list_t *prepared_statements,
                                          h2o_loop_t *loop,
+                                         h2o_multithread_receiver_t *receiver,
                                          db_conn_pool_t *pool)
 {
 	memset(pool, 0, sizeof(*pool));
@@ -739,6 +739,7 @@ void initialize_database_connection_pool(const char *conninfo,
 	pool->loop = loop;
 	pool->prepared_statements = prepared_statements;
 	pool->queries.tail = &pool->queries.head;
+	pool->receiver = receiver;
 	pool->conn_num = config->max_db_conn_num;
 	pool->query_num = config->max_query_num;
 }

+ 4 - 0
frameworks/C/h2o/src/database.h

@@ -22,6 +22,7 @@
 #define DATABASE_H_
 
 #include <h2o.h>
+#include <stdbool.h>
 #include <stdint.h>
 #include <postgresql/libpq-fe.h>
 
@@ -63,11 +64,13 @@ typedef struct {
 	const char *conninfo;
 	h2o_loop_t *loop;
 	const list_t *prepared_statements;
+	h2o_multithread_receiver_t *receiver;
 	// We use a FIFO queue instead of a simpler stack, otherwise the earlier queries may wait
 	// an unbounded amount of time to be executed.
 	queue_t queries;
 	size_t conn_num;
 	size_t query_num;
+	bool process_queries;
 } db_conn_pool_t;
 
 void add_prepared_statement(const char *name, const char *query, list_t **prepared_statements);
@@ -77,6 +80,7 @@ void initialize_database_connection_pool(const char *conninfo,
                                          const struct config_t *config,
                                          const list_t *prepared_statements,
                                          h2o_loop_t *loop,
+                                         h2o_multithread_receiver_t *receiver,
                                          db_conn_pool_t *pool);
 void remove_prepared_statements(list_t *prepared_statements);
 

+ 16 - 2
frameworks/C/h2o/src/event_loop.c

@@ -198,6 +198,13 @@ static void process_messages(h2o_multithread_receiver_t *receiver, h2o_linklist_
 
 				global_thread_data->ctx->shutdown = true;
 				break;
+			case TASK:
+			{
+				task_message_t * const task = H2O_STRUCT_FROM_MEMBER(task_message_t, super, msg);
+
+				task->task(task->arg);
+				break;
+			}
 			default:
 				break;
 		}
@@ -232,13 +239,15 @@ static void shutdown_server(h2o_socket_t *listener, const char *err)
 			ctx->event_loop.h2o_socket = NULL;
 		}
 
+		global_thread_data_t * const global_thread_data =
+			ctx->global_thread_data->global_data->global_thread_data;
+
 		for (size_t i = ctx->global_thread_data->config->thread_num - 1; i > 0; i--) {
 			message_t * const msg = h2o_mem_alloc(sizeof(*msg));
 
 			memset(msg, 0, sizeof(*msg));
 			msg->type = SHUTDOWN;
-			h2o_multithread_send_message(&ctx->global_thread_data[i].h2o_receiver,
-			                             &msg->super);
+			send_message(msg, &global_thread_data[i].h2o_receiver);
 		}
 	}
 }
@@ -322,3 +331,8 @@ void initialize_event_loop(bool is_main_thread,
 		h2o_socket_read_start(global_data->signals, shutdown_server);
 	}
 }
+
+void send_message(message_t *msg, h2o_multithread_receiver_t *h2o_receiver)
+{
+	h2o_multithread_send_message(h2o_receiver, &msg->super);
+}

+ 9 - 1
frameworks/C/h2o/src/event_loop.h

@@ -28,7 +28,8 @@
 #include "global_data.h"
 
 typedef enum {
-	SHUTDOWN
+	SHUTDOWN,
+	TASK
 } message_type_t;
 
 struct thread_context_t;
@@ -46,11 +47,18 @@ typedef struct {
 	h2o_multithread_message_t super;
 } message_t;
 
+typedef struct {
+	message_t super;
+	void *arg;
+	void (*task)(void *);
+} task_message_t;
+
 void event_loop(struct thread_context_t *ctx);
 void free_event_loop(event_loop_t *event_loop, h2o_multithread_receiver_t *h2o_receiver);
 void initialize_event_loop(bool is_main_thread,
                            global_data_t *global_data,
                            h2o_multithread_receiver_t *h2o_receiver,
                            event_loop_t *loop);
+void send_message(message_t *msg, h2o_multithread_receiver_t *h2o_receiver);
 
 #endif // EVENT_LOOP_H_

+ 0 - 5
frameworks/C/h2o/src/global_data.h

@@ -54,7 +54,6 @@ typedef struct config_t {
 typedef struct {
 	h2o_logger_t *file_logger;
 	struct global_thread_data_t *global_thread_data;
-	list_t *postinitialization_tasks;
 	h2o_socket_t *signals;
 	SSL_CTX *ssl_ctx;
 	size_t memory_alignment;
@@ -64,8 +63,4 @@ typedef struct {
 	request_handler_data_t request_handler_data;
 } global_data_t;
 
-void add_postinitialization_task(void (*task)(struct thread_context_t *, void *),
-                                 void *arg,
-                                 list_t **postinitialization_tasks);
-
 #endif // GLOBAL_DATA_H_

+ 1 - 0
frameworks/C/h2o/src/handlers/world.c

@@ -857,6 +857,7 @@ void initialize_world_handler_thread_data(thread_context_t *ctx,
 	                                    ctx->global_thread_data->config,
 	                                    data->prepared_statements,
 	                                    ctx->event_loop.h2o_ctx.loop,
+	                                    &ctx->global_thread_data->h2o_receiver,
 	                                    &thread_data->hello_world_db);
 }
 

+ 2 - 35
frameworks/C/h2o/src/main.c

@@ -21,6 +21,7 @@
 #include <h2o.h>
 #include <pthread.h>
 #include <signal.h>
+#include <stdint.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
@@ -62,16 +63,9 @@
 	"[-s <HTTPS port>] " \
 	"[-t <thread number>]\n"
 
-typedef struct {
-	list_t l;
-	void *arg;
-	void (*task)(thread_context_t *, void *);
-} task_t;
-
 static void free_global_data(global_data_t *global_data);
 static int initialize_global_data(const config_t *config, global_data_t *global_data);
 static int parse_options(int argc, char *argv[], config_t *config);
-static void run_postinitialization_tasks(list_t **tasks, thread_context_t *ctx);
 static void set_default_options(config_t *config);
 static void setup_process(void);
 
@@ -129,7 +123,6 @@ static int initialize_global_data(const config_t *config, global_data_t *global_
 	initialize_request_handlers(config,
 	                            hostconf,
 	                            log_handle,
-	                            &global_data->postinitialization_tasks,
 	                            &global_data->request_handler_data);
 
 	// Must be registered after the rest of the request handlers.
@@ -253,18 +246,6 @@ static int parse_options(int argc, char *argv[], config_t *config)
 	return 0;
 }
 
-static void run_postinitialization_tasks(list_t **tasks, thread_context_t *ctx)
-{
-	if (*tasks)
-		do {
-			task_t * const t = H2O_STRUCT_FROM_MEMBER(task_t, l, *tasks);
-
-			*tasks = (*tasks)->next;
-			t->task(ctx, t->arg);
-			free(t);
-		} while (*tasks);
-}
-
 static void set_default_options(config_t *config)
 {
 	if (!config->db_timeout)
@@ -280,7 +261,7 @@ static void set_default_options(config_t *config)
 		config->max_db_conn_num = 1;
 
 	if (!config->max_pipeline_query_num)
-		config->max_pipeline_query_num = 16;
+		config->max_pipeline_query_num = SIZE_MAX;
 
 	if (!config->max_query_num)
 		config->max_query_num = 10000;
@@ -313,19 +294,6 @@ static void setup_process(void)
 	CHECK_ERRNO(setrlimit, RLIMIT_NOFILE, &rlim);
 }
 
-void add_postinitialization_task(void (*task)(struct thread_context_t *, void *),
-                                 void *arg,
-                                 list_t **postinitialization_tasks)
-{
-	task_t * const t = h2o_mem_alloc(sizeof(*t));
-
-	memset(t, 0, sizeof(*t));
-	t->l.next = *postinitialization_tasks;
-	t->arg = arg;
-	t->task = task;
-	*postinitialization_tasks = &t->l;
-}
-
 int main(int argc, char *argv[])
 {
 	config_t config;
@@ -340,7 +308,6 @@ int main(int argc, char *argv[])
 			setup_process();
 			start_threads(global_data.global_thread_data);
 			initialize_thread_context(global_data.global_thread_data, true, &ctx);
-			run_postinitialization_tasks(&global_data.postinitialization_tasks, &ctx);
 			event_loop(&ctx);
 			// Even though this is global data, we need to close
 			// it before the associated event loop is cleaned up.

+ 0 - 2
frameworks/C/h2o/src/request_handler.c

@@ -106,10 +106,8 @@ void initialize_request_handler_thread_data(thread_context_t *ctx)
 void initialize_request_handlers(const config_t *config,
                                  h2o_hostconf_t *hostconf,
                                  h2o_access_log_filehandle_t *log_handle,
-                                 list_t **postinitialization_tasks,
                                  request_handler_data_t *data)
 {
-	IGNORE_FUNCTION_PARAMETER(postinitialization_tasks);
 	initialize_fortunes_handler(config, hostconf, log_handle, data);
 	initialize_json_serializer_handler(hostconf, log_handle);
 	initialize_plaintext_handler(hostconf, log_handle);

+ 0 - 1
frameworks/C/h2o/src/request_handler.h

@@ -56,7 +56,6 @@ void initialize_request_handler_thread_data(thread_context_t *ctx);
 void initialize_request_handlers(const config_t *config,
                                  h2o_hostconf_t *hostconf,
                                  h2o_access_log_filehandle_t *log_handle,
-                                 list_t **postinitialization_tasks,
                                  request_handler_data_t *data);
 void register_request_handler(const char *path,
                               int (*handler)(struct st_h2o_handler_t *, h2o_req_t *),