Browse Source

H2O: Improve the load balancing across database connections (#9039)

Also, decrease the memory consumption of the cached queries test.
Anton Kirilov 1 year ago
parent
commit
6804613281

+ 120 - 79
frameworks/C/h2o/src/database.c

@@ -75,7 +75,7 @@ static void on_database_write_ready(h2o_socket_t *sock, const char *err);
 static void on_process_queries(void *arg);
 static void poll_database_connection(h2o_socket_t *sock, const char *err);
 static void prepare_statements(db_conn_t *conn);
-static void process_queries(db_conn_t *conn, bool removed);
+static void process_queries(db_conn_pool_t *pool);
 static void remove_connection(db_conn_t *conn);
 static void start_database_connect(db_conn_pool_t *pool, db_conn_t *conn);
 
@@ -237,7 +237,10 @@ static void on_database_connect_read_ready(h2o_socket_t *sock, const char *err)
 					h2o_timer_unlink(&conn->timer);
 					h2o_socket_read_stop(conn->sock);
 					h2o_socket_read_start(conn->sock, on_database_read_ready);
-					process_queries(conn, true);
+					*conn->pool->conn.tail = &conn->l;
+					conn->pool->conn.tail = &conn->l.next;
+					conn->l.next = NULL;
+					process_queries(conn->pool);
 					return;
 				default:
 					LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
@@ -370,7 +373,13 @@ static void on_database_read_ready(h2o_socket_t *sock, const char *err)
 	for (PGnotify *notify = PQnotifies(conn->conn); notify; notify = PQnotifies(conn->conn))
 		PQfreemem(notify);
 
-	process_queries(conn, removed);
+	if (removed && conn->query_num) {
+		*conn->pool->conn.tail = &conn->l;
+		conn->pool->conn.tail = &conn->l.next;
+		conn->l.next = NULL;
+	}
+
+	process_queries(conn->pool);
 }
 
 static void on_database_timeout(h2o_timer_t *timer)
@@ -405,20 +414,83 @@ static void on_database_write_ready(h2o_socket_t *sock, const char *err)
 
 static void on_process_queries(void *arg)
 {
+	list_t *iter = NULL;
 	db_conn_pool_t * const pool = arg;
+	size_t query_num = 0;
 
-	while (pool->queries.head && pool->conn) {
-		db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, pool->conn);
+	while (pool->queries.head && pool->conn.head) {
+		db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, pool->conn.head);
+		db_query_param_t * const param = H2O_STRUCT_FROM_MEMBER(db_query_param_t,
+		                                                        l,
+		                                                        conn->pool->queries.head);
 
-		pool->conn = conn->l.next;
 		assert(conn->query_num);
-		process_queries(conn, true);
+		assert(pool->query_num < pool->config->max_query_num);
+		pool->conn.head = conn->l.next;
+		pool->queries.head = param->l.next;
+
+		if (!pool->conn.head) {
+			assert(pool->conn.tail == &conn->l.next);
+			pool->conn.tail = &pool->conn.head;
+		}
+
+		if (++pool->query_num == pool->config->max_query_num) {
+			assert(!pool->queries.head);
+			assert(pool->queries.tail == &param->l.next);
+			pool->queries.tail = &pool->queries.head;
+		}
+
+		if (do_execute_query(conn, param)) {
+			param->on_error(param, DB_ERROR);
+			on_database_error(conn, DB_ERROR);
+		}
+		else {
+			query_num++;
+
+			if (conn->query_num) {
+				*pool->conn.tail = &conn->l;
+				pool->conn.tail = &conn->l.next;
+				conn->l.next = NULL;
+			}
+			else {
+				conn->l.next = iter;
+				iter = &conn->l;
+			}
+		}
 	}
 
-	if (pool->queries.head && pool->conn_num)
-		start_database_connect(pool, NULL);
+	if (iter)
+		do {
+			db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, iter);
+
+			iter = conn->l.next;
+
+			if (flush_connection(on_database_write_ready, conn))
+				on_database_error(conn, DB_ERROR);
+		} while (iter);
 
+	pool->conn.tail = &pool->conn.head;
 	pool->process_queries = false;
+	query_num += pool->config->max_query_num - pool->query_num;
+
+	for (iter = pool->conn.head; iter;) {
+		db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, iter);
+
+		iter = conn->l.next;
+
+		if (flush_connection(on_database_write_ready, conn)) {
+			*pool->conn.tail = iter;
+			on_database_error(conn, DB_ERROR);
+		}
+		else
+			pool->conn.tail = &conn->l.next;
+	}
+
+	const size_t conn_num = pool->config->max_db_conn_num - pool->conn_num;
+
+	if (query_num > conn_num)
+		for (query_num -= conn_num; pool->conn_num && query_num; query_num--)
+			start_database_connect(pool, NULL);
 }
 
 static void poll_database_connection(h2o_socket_t *sock, const char *err)
@@ -536,54 +608,44 @@ static void prepare_statements(db_conn_t *conn)
 	}
 	else {
 		h2o_socket_read_start(conn->sock, on_database_read_ready);
-		process_queries(conn, true);
+		*conn->pool->conn.tail = &conn->l;
+		conn->pool->conn.tail = &conn->l.next;
+		conn->l.next = NULL;
+		process_queries(conn->pool);
 	}
 }
 
-static void process_queries(db_conn_t *conn, bool removed)
+static void process_queries(db_conn_pool_t *pool)
 {
-	const bool flush = conn->query_num && conn->pool->queries.head;
-
-	while (conn->query_num && conn->pool->queries.head) {
-		db_query_param_t * const param = H2O_STRUCT_FROM_MEMBER(db_query_param_t,
-		                                                        l,
-		                                                        conn->pool->queries.head);
-
-		if (++conn->pool->query_num == conn->pool->config->max_query_num) {
-			assert(conn->pool->queries.tail == &param->l.next);
-			conn->pool->queries.tail = &conn->pool->queries.head;
-		}
-
-		conn->pool->queries.head = param->l.next;
-
-		if (do_execute_query(conn, param)) {
-			param->on_error(param, DB_ERROR);
-			on_database_error(conn, DB_ERROR);
-			return;
-		}
-	}
-
-	if (flush && flush_connection(on_database_write_ready, conn))
-		on_database_error(conn, DB_ERROR);
-	else if (conn->query_num && removed) {
-		conn->l.next = conn->pool->conn;
-		conn->pool->conn = &conn->l;
+	if (!pool->process_queries && pool->queries.head) {
+		task_message_t * const msg = h2o_mem_alloc(sizeof(*msg));
+
+		assert(pool->query_num < pool->config->max_query_num);
+		memset(msg, 0, sizeof(*msg));
+		msg->arg = pool;
+		msg->super.type = TASK;
+		msg->task = on_process_queries;
+		pool->process_queries = true;
+		send_local_message(&msg->super, pool->local_messages);
 	}
-	else if (!conn->query_num && !removed)
-		// This call should not be problematic, assuming a relatively low number of connections.
-		remove_connection(conn);
 }
 
 static void remove_connection(db_conn_t *conn)
 {
-	list_t *iter = conn->pool->conn;
-	list_t **prev = &conn->pool->conn;
+	list_t *iter = conn->pool->conn.head;
+	list_t **prev = &conn->pool->conn.head;
 
 	for (; iter && iter != &conn->l; iter = iter->next)
 		prev = &iter->next;
 
-	if (iter)
+	if (iter) {
 		*prev = iter->next;
+
+		if (!conn->pool->conn.head) {
+			assert(conn->pool->conn.tail == &iter->next);
+			conn->pool->conn.tail = &conn->pool->conn.head;
+		}
+	}
 }
 
 static void start_database_connect(db_conn_pool_t *pool, db_conn_t *conn)
@@ -661,37 +723,15 @@ int execute_database_query(db_conn_pool_t *pool, db_query_param_t *param)
 	int ret = 1;
 
 	if (pool->query_num) {
-		if (pool->conn) {
-			// Delay sending the database queries to the server, so that if there is a rapid
-			// succession of calls to this function, all resultant queries would be inserted
-			// into a command pipeline with a smaller number of system calls.
-			if (!pool->process_queries) {
-				task_message_t * const msg = h2o_mem_alloc(sizeof(*msg));
-
-				memset(msg, 0, sizeof(*msg));
-				msg->arg = pool;
-				msg->super.type = TASK;
-				msg->task = on_process_queries;
-				send_local_message(&msg->super, pool->local_messages);
-				pool->process_queries = true;
-			}
-
-			ret = 0;
-		}
-		else {
-			if (pool->conn_num)
-				start_database_connect(pool, NULL);
-
-			if (pool->conn_num < pool->config->max_db_conn_num && pool->query_num)
-				ret = 0;
-		}
-
-		if (!ret) {
-			param->l.next = NULL;
-			*pool->queries.tail = &param->l;
-			pool->queries.tail = &param->l.next;
-			pool->query_num--;
-		}
+		// Delay sending the database queries to the server, so that if there is a rapid
+		// succession of calls to this function, all resultant queries would be inserted
+		// into a command pipeline with a smaller number of system calls.
+		param->l.next = NULL;
+		*pool->queries.tail = &param->l;
+		pool->queries.tail = &param->l.next;
+		pool->query_num--;
+		process_queries(pool);
+		ret = 0;
 	}
 
 	return ret;
@@ -704,9 +744,9 @@ void free_database_connection_pool(db_conn_pool_t *pool)
 
 	size_t num = 0;
 
-	if (pool->conn)
+	if (pool->conn.head)
 		do {
-			db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, pool->conn);
+			db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, pool->conn.head);
 
 			assert(!conn->queries.head);
 			assert(conn->query_num == pool->config->max_pipeline_query_num);
@@ -715,10 +755,10 @@ void free_database_connection_pool(db_conn_pool_t *pool)
 			h2o_socket_read_stop(conn->sock);
 			h2o_socket_close(conn->sock);
 			PQfinish(conn->conn);
-			pool->conn = pool->conn->next;
-			free(conn);
+			pool->conn.head = conn->l.next;
 			num++;
-		} while (pool->conn);
+			free(conn);
+		} while (pool->conn.head);
 
 	assert(num + pool->conn_num == pool->config->max_db_conn_num);
 }
@@ -732,6 +772,7 @@ void initialize_database_connection_pool(const char *conninfo,
 {
 	memset(pool, 0, sizeof(*pool));
 	pool->config = config;
+	pool->conn.tail = &pool->conn.head;
 	pool->conninfo = conninfo ? conninfo : "";
 	pool->local_messages = local_messages;
 	pool->loop = loop;

+ 1 - 1
frameworks/C/h2o/src/database.h

@@ -60,7 +60,7 @@ typedef struct db_query_param_t {
 
 typedef struct {
 	const struct config_t *config;
-	list_t *conn;
+	queue_t conn;
 	const char *conninfo;
 	h2o_linklist_t *local_messages;
 	h2o_loop_t *loop;

+ 4 - 3
frameworks/C/h2o/src/handlers/world.c

@@ -416,7 +416,7 @@ static void fetch_from_cache(uint64_t now,
 		h2o_cache_ref_t * const r = h2o_cache_fetch(data->world_cache, now, key, 0);
 
 		if (r) {
-			const uint32_t * const table = (const uint32_t *) r->value.base;
+			const uint16_t * const table = (const uint16_t *) r->value.base;
 
 			for (size_t i = 0; i < query_ctx->num_query; i++) {
 				const uint32_t id = query_ctx->res[i].id;
@@ -440,7 +440,7 @@ static void fetch_from_cache(uint64_t now,
 			memset(ctx, 0, sizeof(*ctx));
 			ctx->data = data;
 			ctx->loop = query_ctx->ctx->event_loop.h2o_ctx.loop;
-			ctx->table.len = (MAX_ID + 1) * sizeof(uint32_t);
+			ctx->table.len = (MAX_ID + 1) * sizeof(uint16_t);
 			ctx->table.base = h2o_mem_alloc(ctx->table.len);
 			memset(ctx->table.base, 0, ctx->table.len);
 			ctx->param.command = POPULATE_CACHE_QUERY;
@@ -605,10 +605,11 @@ static result_return_t on_populate_cache_result(db_query_param_t *param, PGresul
 		                                                                param,
 		                                                                param);
 		query_result_t r = {.id = 0};
-		uint32_t * const table = (uint32_t *) query_ctx->table.base;
+		uint16_t * const table = (uint16_t *) query_ctx->table.base;
 
 		for (size_t i = 0; i < num_rows; i++) {
 			process_result(result, i, &r);
+			assert(r.random_number <= UINT16_MAX);
 			table[r.id] = r.random_number;
 		}