Răsfoiți Sursa

H2O: Move the thread-specific data to each thread's stack

Anton Kirilov 8 ani în urmă
părinte
comite
f0ca77cb05

+ 4 - 5
frameworks/C/h2o/src/database.c

@@ -353,8 +353,7 @@ static void start_database_connect(thread_context_t *ctx, db_conn_t *db_conn)
 			goto error;
 		}
 
-		const char * const conninfo =
-			ctx->global_data->config->db_host ? ctx->global_data->config->db_host : "";
+		const char * const conninfo = ctx->config->db_host ? ctx->config->db_host : "";
 
 		db_conn->conn = PQconnectStart(conninfo);
 
@@ -441,7 +440,7 @@ static void stop_database_write_polling(db_conn_t *db_conn)
 
 void connect_to_database(thread_context_t *ctx)
 {
-	for (size_t i = ctx->db_state.db_conn_num; i < ctx->global_data->config->max_db_conn_num; i++)
+	for (size_t i = ctx->db_state.db_conn_num; i < ctx->config->max_db_conn_num; i++)
 		start_database_connect(ctx, NULL);
 }
 
@@ -475,13 +474,13 @@ int execute_query(thread_context_t *ctx, db_query_param_t *param)
 		db_conn->param = param;
 		do_execute_query(db_conn);
 	}
-	else if (ctx->db_state.query_num < ctx->global_data->config->max_query_num) {
+	else if (ctx->db_state.query_num < ctx->config->max_query_num) {
 		param->l.next = NULL;
 		*ctx->db_state.queries.tail = &param->l;
 		ctx->db_state.queries.tail = &param->l.next;
 		ctx->db_state.query_num++;
 
-		if (ctx->db_state.db_conn_num < ctx->global_data->config->max_db_conn_num)
+		if (ctx->db_state.db_conn_num < ctx->config->max_db_conn_num)
 			start_database_connect(ctx, NULL);
 	}
 	else

+ 11 - 14
frameworks/C/h2o/src/event_loop.c

@@ -24,7 +24,6 @@
 #include <string.h>
 #include <unistd.h>
 #include <sys/epoll.h>
-#include <sys/syscall.h>
 
 #include "error.h"
 #include "event_loop.h"
@@ -57,7 +56,7 @@ static void accept_connection(h2o_socket_t *listener, const char *err)
 				sock->on_close.cb = on_close_connection;
 				sock->on_close.data = &ctx->event_loop.conn_num;
 				h2o_accept(&ctx->event_loop.h2o_accept_ctx, sock);
-			} while (++accepted < ctx->global_data->config->max_accept);
+			} while (++accepted < ctx->config->max_accept);
 		}
 	}
 }
@@ -96,11 +95,11 @@ static void process_messages(h2o_multithread_receiver_t *receiver, h2o_linklist_
 {
 	IGNORE_FUNCTION_PARAMETER(messages);
 
-	thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
-	                                                      event_loop.h2o_receiver,
-	                                                      receiver);
+	global_thread_data_t * const global_thread_data = H2O_STRUCT_FROM_MEMBER(global_thread_data_t,
+	                                                                         h2o_receiver,
+	                                                                         receiver);
 
-	h2o_socket_read_stop(ctx->event_loop.h2o_socket);
+	h2o_socket_read_stop(global_thread_data->ctx->event_loop.h2o_socket);
 }
 
 static void shutdown_server(h2o_socket_t *listener, const char *err)
@@ -113,23 +112,20 @@ static void shutdown_server(h2o_socket_t *listener, const char *err)
 		ctx->global_data->shutdown = true;
 		h2o_socket_read_stop(ctx->event_loop.h2o_socket);
 
-		for (size_t i = 1; i < ctx->global_data->config->thread_num; i++)
-			h2o_multithread_send_message(&ctx[i].event_loop.h2o_receiver, NULL);
+		for (size_t i = 1; i < ctx->config->thread_num; i++)
+			h2o_multithread_send_message(&ctx->global_thread_data[i].h2o_receiver, NULL);
 	}
 }
 
 void event_loop(thread_context_t *ctx)
 {
-	ctx->tid = syscall(SYS_gettid);
-	ctx->random_seed = ctx->tid;
-
 	while (!ctx->global_data->shutdown || ctx->event_loop.conn_num)
 		h2o_evloop_run(ctx->event_loop.h2o_ctx.loop);
 }
 
-void free_event_loop(event_loop_t *event_loop)
+void free_event_loop(event_loop_t *event_loop, h2o_multithread_receiver_t *h2o_receiver)
 {
-	h2o_multithread_unregister_receiver(event_loop->h2o_ctx.queue, &event_loop->h2o_receiver);
+	h2o_multithread_unregister_receiver(event_loop->h2o_ctx.queue, h2o_receiver);
 	h2o_socket_close(event_loop->h2o_socket);
 	h2o_socket_close(event_loop->epoll_socket);
 	h2o_context_dispose(&event_loop->h2o_ctx);
@@ -137,6 +133,7 @@ void free_event_loop(event_loop_t *event_loop)
 
 void initialize_event_loop(bool is_main_thread,
                            global_data_t *global_data,
+                           h2o_multithread_receiver_t *h2o_receiver,
                            event_loop_t *loop)
 {
 	memset(loop, 0, sizeof(*loop));
@@ -165,7 +162,7 @@ void initialize_event_loop(bool is_main_thread,
 	loop->h2o_socket->data = loop;
 	h2o_socket_read_start(loop->h2o_socket, accept_connection);
 	h2o_multithread_register_receiver(loop->h2o_ctx.queue,
-	                                  &loop->h2o_receiver,
+	                                  h2o_receiver,
 	                                  process_messages);
 	// libh2o's event loop does not support write polling unless it
 	// controls sending the data as well, so do read polling on the

+ 2 - 2
frameworks/C/h2o/src/event_loop.h

@@ -35,13 +35,13 @@ typedef struct {
 	int epoll_fd;
 	h2o_accept_ctx_t h2o_accept_ctx;
 	h2o_context_t h2o_ctx;
-	h2o_multithread_receiver_t h2o_receiver;
 } event_loop_t;
 
 void event_loop(thread_context_t *ctx);
-void free_event_loop(event_loop_t *event_loop);
+void free_event_loop(event_loop_t *event_loop, h2o_multithread_receiver_t *h2o_receiver);
 void initialize_event_loop(bool is_main_thread,
                            global_data_t *global_data,
+                           h2o_multithread_receiver_t *h2o_receiver,
                            event_loop_t *loop);
 int start_write_polling(int fd,
                         void (**on_write_ready)(void *),

+ 21 - 15
frameworks/C/h2o/src/main.c

@@ -24,6 +24,7 @@
 #include <netdb.h>
 #include <signal.h>
 #include <stdarg.h>
+#include <stdbool.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
@@ -44,6 +45,7 @@
 #include "tls.h"
 #include "utility.h"
 
+#define DEFAULT_CACHE_LINE_SIZE 128
 #define DEFAULT_TCP_FASTOPEN_QUEUE_LEN 4096
 #define USAGE_MESSAGE \
 	"Usage:\n%s [-a <max connections accepted simultaneously>] [-b <bind address>] " \
@@ -63,8 +65,12 @@ static void setup_process(void);
 
 static void free_global_data(global_data_t *global_data)
 {
-	if (global_data->ctx)
-		free_thread_contexts(global_data);
+	if (global_data->global_thread_data) {
+		for (size_t i = 1; i < global_data->global_thread_data->config->thread_num; i++)
+			CHECK_ERROR(pthread_join, global_data->global_thread_data[i].thread, NULL);
+
+		free(global_data->global_thread_data);
+	}
 
 	if (global_data->file_logger)
 		global_data->file_logger->dispose(global_data->file_logger);
@@ -177,14 +183,7 @@ static int initialize_global_data(const config_t *config, global_data_t *global_
 	sigset_t signals;
 
 	memset(global_data, 0, sizeof(*global_data));
-	global_data->config = config;
 	global_data->memory_alignment = get_maximum_cache_line_size();
-
-	if (global_data->memory_alignment > DEFAULT_CACHE_LINE_SIZE) {
-		ERROR("Unexpected maximum cache line size.");
-		return EXIT_FAILURE;
-	}
-
 	CHECK_ERRNO(sigemptyset, &signals);
 #ifdef NDEBUG
 	CHECK_ERRNO(sigaddset, &signals, SIGINT);
@@ -199,7 +198,7 @@ static int initialize_global_data(const config_t *config, global_data_t *global_
 		goto error;
 
 	if (config->cert && config->key)
-		initialize_openssl(global_data);
+		initialize_openssl(config, global_data);
 
 	const h2o_iovec_t host = h2o_iovec_init(H2O_STRLIT("default"));
 	h2o_hostconf_t * const hostconf = h2o_config_register_host(&global_data->h2o_config,
@@ -225,9 +224,9 @@ static int initialize_global_data(const config_t *config, global_data_t *global_
 			global_data->file_logger = h2o_access_log_register(pathconf, log_handle);
 	}
 
-	global_data->ctx = initialize_thread_contexts(global_data);
+	global_data->global_thread_data = initialize_global_thread_data(config, global_data);
 
-	if (global_data->ctx) {
+	if (global_data->global_thread_data) {
 		printf("Number of processors: %zu\nMaximum cache line size: %zu\n",
 		       h2o_numproc(),
 		       global_data->memory_alignment);
@@ -362,10 +361,17 @@ int main(int argc, char *argv[])
 		global_data_t global_data;
 
 		if (initialize_global_data(&config, &global_data) == EXIT_SUCCESS) {
+			thread_context_t ctx;
+
 			setup_process();
-			start_threads(global_data.ctx);
-			connect_to_database(global_data.ctx);
-			event_loop(global_data.ctx);
+			start_threads(global_data.global_thread_data);
+			initialize_thread_context(global_data.global_thread_data, true, &ctx);
+			connect_to_database(&ctx);
+			event_loop(&ctx);
+			// Even though this is global data, we need to close
+			// it before the associated event loop is cleaned up.
+			h2o_socket_close(global_data.signals);
+			free_thread_context(&ctx);
 			free_global_data(&global_data);
 			rc = EXIT_SUCCESS;
 		}

+ 49 - 31
frameworks/C/h2o/src/thread.c

@@ -22,9 +22,12 @@
 #include <errno.h>
 #include <h2o.h>
 #include <pthread.h>
+#include <stdbool.h>
 #include <stdlib.h>
+#include <string.h>
 #include <h2o/serverutil.h>
 #include <sys/epoll.h>
+#include <sys/syscall.h>
 
 #include "database.h"
 #include "error.h"
@@ -35,71 +38,86 @@ static void *run_thread(void *arg);
 
 static void *run_thread(void *arg)
 {
-	connect_to_database(arg);
-	event_loop(arg);
+	thread_context_t ctx;
+
+	initialize_thread_context(arg, false, &ctx);
+	connect_to_database(&ctx);
+	event_loop(&ctx);
+	free_thread_context(&ctx);
 	pthread_exit(NULL);
 }
 
-void free_thread_contexts(global_data_t *global_data)
+void free_thread_context(thread_context_t *ctx)
 {
-	thread_context_t * const ctx = global_data->ctx;
-
-	for (size_t i = 0; i < ctx->global_data->config->thread_num; i++) {
-		if (i)
-			CHECK_ERROR(pthread_join, ctx[i].thread, NULL);
-		else
-			// Even though this is global data, we need to close
-			// it before the associated event loop is cleaned up.
-			h2o_socket_close(global_data->signals);
-
-		free_database_state(ctx[i].event_loop.h2o_ctx.loop, &ctx[i].db_state);
-		free_event_loop(&ctx[i].event_loop);
-	}
-
-	free(ctx);
+	free_database_state(ctx->event_loop.h2o_ctx.loop, &ctx->db_state);
+	free_event_loop(&ctx->event_loop, &ctx->global_thread_data->h2o_receiver);
 }
 
-thread_context_t *initialize_thread_contexts(global_data_t *global_data)
+global_thread_data_t *initialize_global_thread_data(const config_t *config,
+                                                    global_data_t *global_data)
 {
-	const size_t sz = global_data->config->thread_num * sizeof(thread_context_t);
-	thread_context_t * const ret = aligned_alloc(global_data->memory_alignment, sz);
+	const size_t sz = config->thread_num * sizeof(thread_context_t);
+	// The global thread data is modified only at program initialization and termination,
+	// and is not accessed by performance-sensitive code, so false sharing is not a concern.
+	global_thread_data_t * const ret = aligned_alloc(global_data->memory_alignment, sz);
 
 	if (ret) {
 		memset(ret, 0, sz);
 
-		for (size_t i = 0; i < global_data->config->thread_num; i++) {
+		for (size_t i = 0; i < config->thread_num; i++) {
+			ret[i].config = config;
 			ret[i].global_data = global_data;
-			initialize_event_loop(!i, global_data, &ret[i].event_loop);
-			initialize_database_state(ret[i].event_loop.h2o_ctx.loop, &ret[i].db_state);
 		}
 	}
 
 	return ret;
 }
 
-void start_threads(thread_context_t *ctx)
+void initialize_thread_context(global_thread_data_t *global_thread_data,
+                               bool is_main_thread,
+                               thread_context_t *ctx)
+{
+	memset(ctx, 0, sizeof(*ctx));
+	ctx->config = global_thread_data->config;
+	ctx->global_data = global_thread_data->global_data;
+	ctx->global_thread_data = global_thread_data;
+	ctx->tid = syscall(SYS_gettid);
+	ctx->random_seed = ctx->tid;
+	initialize_event_loop(is_main_thread,
+	                      global_thread_data->global_data,
+	                      &global_thread_data->h2o_receiver,
+	                      &ctx->event_loop);
+	initialize_database_state(ctx->event_loop.h2o_ctx.loop, &ctx->db_state);
+	global_thread_data->ctx = ctx;
+}
+
+void start_threads(global_thread_data_t *global_thread_data)
 {
 	const size_t num_cpus = h2o_numproc();
 
 	// The first thread context is used by the main thread.
-	ctx->thread = pthread_self();
+	global_thread_data->thread = pthread_self();
 
-	for (size_t i = 1; i < ctx->global_data->config->thread_num; i++)
-		CHECK_ERROR(pthread_create, &ctx[i].thread, NULL, run_thread, ctx + i);
+	for (size_t i = 1; i < global_thread_data->config->thread_num; i++)
+		CHECK_ERROR(pthread_create,
+		            &global_thread_data[i].thread,
+		            NULL,
+		            run_thread,
+		            global_thread_data + i);
 
 	// If the number of threads is not equal to the number of processors, then let the scheduler
 	// decide how to balance the load.
-	if (ctx->global_data->config->thread_num == num_cpus) {
+	if (global_thread_data->config->thread_num == num_cpus) {
 		const size_t cpusetsize = CPU_ALLOC_SIZE(num_cpus);
 		cpu_set_t * const cpuset = CPU_ALLOC(num_cpus);
 
 		if (!cpuset)
 			abort();
 
-		for (size_t i = 0; i < ctx->global_data->config->thread_num; i++) {
+		for (size_t i = 0; i < global_thread_data->config->thread_num; i++) {
 			CPU_ZERO_S(cpusetsize, cpuset);
 			CPU_SET_S(i, cpusetsize, cpuset);
-			CHECK_ERROR(pthread_setaffinity_np, ctx[i].thread, cpusetsize, cpuset);
+			CHECK_ERROR(pthread_setaffinity_np, global_thread_data[i].thread, cpusetsize, cpuset);
 		}
 
 		CPU_FREE(cpuset);

+ 20 - 12
frameworks/C/h2o/src/thread.h

@@ -24,33 +24,41 @@
 #include <assert.h>
 #include <h2o.h>
 #include <pthread.h>
+#include <stdbool.h>
 #include <sys/types.h>
 
 #include "database.h"
 #include "event_loop.h"
 #include "utility.h"
 
-#define DEFAULT_CACHE_LINE_SIZE 128
-
 typedef struct thread_context_t thread_context_t;
 
+typedef struct global_thread_data_t {
+	const config_t *config;
+	thread_context_t *ctx;
+	global_data_t *global_data;
+	h2o_multithread_receiver_t h2o_receiver;
+	pthread_t thread;
+} global_thread_data_t;
+
 struct thread_context_t {
+	const config_t *config;
 	global_data_t *global_data;
+	// global_thread_data contains config and global_data as well,
+	// but keep copies here to avoid some pointer chasing.
+	global_thread_data_t *global_thread_data;
 	unsigned random_seed;
 	pid_t tid;
 	db_state_t db_state;
 	event_loop_t event_loop;
-	pthread_t thread;
-	// Align on the cache line size to prevent false sharing.
-	char padding[89];
 };
 
-static_assert(!(sizeof(thread_context_t) % DEFAULT_CACHE_LINE_SIZE),
-              "The size of the thread_context_t structure must be a "
-              "multiple of the cache line size.");
-
-void free_thread_contexts(global_data_t *global_data);
-thread_context_t *initialize_thread_contexts(global_data_t *global_data);
-void start_threads(thread_context_t *ctx);
+void free_thread_context(thread_context_t *ctx);
+global_thread_data_t *initialize_global_thread_data(const config_t *config,
+                                                    global_data_t *global_data);
+void initialize_thread_context(global_thread_data_t *global_thread_data,
+                               bool is_main_thread,
+                               thread_context_t *ctx);
+void start_threads(global_thread_data_t *global_thread_data);
 
 #endif // THREAD_H_

+ 3 - 3
frameworks/C/h2o/src/tls.c

@@ -136,7 +136,7 @@ void cleanup_openssl(global_data_t *global_data)
 	CHECK_ERROR(pthread_mutexattr_destroy, &openssl_global_data.lock_attr);
 }
 
-void initialize_openssl(global_data_t *global_data)
+void initialize_openssl(const config_t *config, global_data_t *global_data)
 {
 	SSL_library_init();
 	SSL_load_error_strings();
@@ -160,10 +160,10 @@ void initialize_openssl(global_data_t *global_data)
 	global_data->ssl_ctx = SSL_CTX_new(TLSv1_2_server_method());
 	CHECK_OPENSSL_ERROR(SSL_CTX_use_certificate_file,
 	                    global_data->ssl_ctx,
-	                    global_data->config->cert,
+	                    config->cert,
 	                    SSL_FILETYPE_PEM);
 	CHECK_OPENSSL_ERROR(SSL_CTX_use_PrivateKey_file,
 	                    global_data->ssl_ctx,
-	                    global_data->config->key,
+	                    config->key,
 	                    SSL_FILETYPE_PEM);
 }

+ 1 - 1
frameworks/C/h2o/src/tls.h

@@ -24,6 +24,6 @@
 #include "utility.h"
 
 void cleanup_openssl(global_data_t *global_data);
-void initialize_openssl(global_data_t *global_data);
+void initialize_openssl(const config_t *config, global_data_t *global_data);
 
 #endif // TLS_H_

+ 2 - 3
frameworks/C/h2o/src/utility.h

@@ -36,7 +36,7 @@
 #define TOSTRING(x) # x
 #define YAJL_STRLIT(s) (const unsigned char *) (s), sizeof(s) - 1
 
-typedef struct thread_context_t thread_context_t;
+typedef struct global_thread_data_t global_thread_data_t;
 
 typedef struct {
 	const char *bind_address;
@@ -54,12 +54,11 @@ typedef struct {
 } config_t;
 
 typedef struct {
-	const config_t *config;
-	thread_context_t *ctx;
 	h2o_logger_t *file_logger;
 	mustache_template_t *fortunes_template;
 	h2o_socket_t *signals;
 	SSL_CTX *ssl_ctx;
+	global_thread_data_t *global_thread_data;
 	size_t memory_alignment;
 	int listener_sd;
 	int signal_fd;