소스 검색

H2O: Improve worker thread load balancing (#10128)

Use a simple eBPF program that assigns incoming connections to
worker threads in a mostly round-robin fashion.

Signed-off-by: Anton Kirilov <[email protected]>
Anton Kirilov 4 주 전
부모
커밋
31c1afbe43

+ 38 - 7
frameworks/C/h2o/CMakeLists.txt

@@ -1,5 +1,6 @@
 cmake_minimum_required(VERSION 3.18.0)
 project(h2o-app)
+find_library(BPF_LIB bpf REQUIRED)
 find_library(CRYPTO_LIB crypto REQUIRED)
 find_library(H2O_LIB h2o-evloop REQUIRED)
 find_library(MUSTACHE_C_LIB mustache_c REQUIRED)
@@ -8,27 +9,57 @@ find_library(PQ_LIB pq REQUIRED)
 find_library(SSL_LIB ssl REQUIRED)
 find_library(YAJL_LIB yajl REQUIRED)
 find_library(Z_LIB z REQUIRED)
+find_path(ASM_INCLUDE asm/types.h REQUIRED)
+find_path(BPF_INCLUDE bpf/libbpf.h REQUIRED)
 find_path(H2O_INCLUDE h2o.h REQUIRED)
 find_path(MUSTACHE_C_INCLUDE mustache.h REQUIRED)
 find_path(NUMA_INCLUDE numaif.h REQUIRED)
 find_path(OPENSSL_INCLUDE openssl/ssl.h REQUIRED)
 find_path(PQ_INCLUDE postgresql/libpq-fe.h REQUIRED)
 find_path(YAJL_INCLUDE yajl/yajl_gen.h REQUIRED)
-include_directories(src ${H2O_INCLUDE} ${MUSTACHE_C_INCLUDE} ${NUMA_INCLUDE} ${OPENSSL_INCLUDE})
-include_directories(${PQ_INCLUDE} ${YAJL_INCLUDE})
+find_program(BPFTOOL_BIN bpftool REQUIRED)
+find_program(CLANG_BIN clang REQUIRED)
+include_directories(src ${CMAKE_BINARY_DIR}/generated-headers ${BPF_INCLUDE} ${H2O_INCLUDE})
+include_directories(${MUSTACHE_C_INCLUDE} ${NUMA_INCLUDE} ${OPENSSL_INCLUDE} ${PQ_INCLUDE})
+include_directories(${YAJL_INCLUDE})
 set(CMAKE_C_STANDARD 11)
 set(CMAKE_C_STANDARD_REQUIRED ON)
 add_compile_definitions(H2O_USE_LIBUV=0)
 set(COMMON_OPTIONS -flto=auto -pthread)
-add_compile_options(-pedantic -Wall -Wextra ${COMMON_OPTIONS})
+set(WARNING_OPTIONS -pedantic -Wall -Wextra)
+add_compile_options(${COMMON_OPTIONS} ${WARNING_OPTIONS})
 set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -D_FORTIFY_SOURCE=2")
 set(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} -O3")
 set(CMAKE_C_FLAGS_RELWITHDEBINFO "${CMAKE_C_FLAGS_RELWITHDEBINFO} -O3")
-file(GLOB_RECURSE SOURCES "src/*.c")
-add_executable(${PROJECT_NAME} ${SOURCES})
+add_custom_command(
+  OUTPUT ${CMAKE_BINARY_DIR}/generated-headers/socket_load_balancer.h
+  COMMAND ${CLANG_BIN}
+            -c
+            -DNDEBUG
+            -g
+            -I ${ASM_INCLUDE}
+            -I ${BPF_INCLUDE}
+            -mcpu=v3
+            -o ${CMAKE_BINARY_DIR}/socket_load_balancer.o
+            -O3
+            -std=gnu11
+            -target bpf
+            ${WARNING_OPTIONS}
+            ${CMAKE_CURRENT_SOURCE_DIR}/src/bpf/socket_load_balancer.c
+  COMMAND ${BPFTOOL_BIN} gen skeleton ${CMAKE_BINARY_DIR}/socket_load_balancer.o >
+            ${CMAKE_BINARY_DIR}/generated-headers/socket_load_balancer.h
+  DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/src/bpf/socket_load_balancer.c
+  VERBATIM)
+add_custom_target(
+  generated_headers
+  DEPENDS ${CMAKE_BINARY_DIR}/generated-headers/socket_load_balancer.h)
+file(GLOB_RECURSE HANDLER_SOURCES "src/handlers/*.c")
+file(GLOB SOURCES "src/*.c")
+add_executable(${PROJECT_NAME} ${HANDLER_SOURCES} ${SOURCES})
+add_dependencies(${PROJECT_NAME} generated_headers)
 target_link_libraries(${PROJECT_NAME} ${COMMON_OPTIONS})
-target_link_libraries(${PROJECT_NAME} ${H2O_LIB} m ${MUSTACHE_C_LIB} ${NUMA_LIB} ${PQ_LIB})
-target_link_libraries(${PROJECT_NAME} ${SSL_LIB} ${CRYPTO_LIB} ${YAJL_LIB} ${Z_LIB})
+target_link_libraries(${PROJECT_NAME} ${BPF_LIB} ${H2O_LIB} m ${MUSTACHE_C_LIB} ${NUMA_LIB})
+target_link_libraries(${PROJECT_NAME} ${PQ_LIB} ${SSL_LIB} ${CRYPTO_LIB} ${YAJL_LIB} ${Z_LIB})
 install(TARGETS ${PROJECT_NAME} RUNTIME DESTINATION bin)
 file(GLOB TEMPLATES "template/*")
 install(FILES ${TEMPLATES} DESTINATION share/${PROJECT_NAME}/template)

+ 14 - 5
frameworks/C/h2o/README.md

@@ -1,13 +1,22 @@
-# h2o
+# h2o-app
 
 This is a framework implementation using the [H2O](https://h2o.examp1e.net) HTTP server. It
 builds directly on top of `libh2o` instead of running the standalone server.
 
 ## Requirements
 
-[CMake](https://cmake.org), [H2O](https://h2o.examp1e.net), [libpq](https://www.postgresql.org),
-[mustache-c](https://github.com/x86-64/mustache-c), [numactl](https://github.com/numactl/numactl),
-[OpenSSL](https://www.openssl.org), [YAJL](https://lloyd.github.io/yajl)
+[bpftool](https://bpftool.dev/),
+[Clang](https://clang.llvm.org/),
+[CMake](https://cmake.org/),
+[GNU C Library](https://www.gnu.org/software/libc),
+[H2O](https://h2o.examp1e.net/),
+[libbpf](https://github.com/libbpf/libbpf),
+[libpq](https://www.postgresql.org/),
+[Linux](https://kernel.org/),
+[mustache-c](https://github.com/x86-64/mustache-c),
+[numactl](https://github.com/numactl/numactl),
+[OpenSSL](https://www.openssl.org/),
+[YAJL](https://lloyd.github.io/yajl)
 
 ## Test implementations
 
@@ -30,4 +39,4 @@ options respectively.
 
 ## Contact
 
-Anton Kirilov <antonvkirilov@gmail.com>
+Anton Kirilov <antonvkirilov@proton.me>

+ 11 - 6
frameworks/C/h2o/h2o.dockerfile

@@ -6,14 +6,19 @@ FROM "ubuntu:${UBUNTU_VERSION}" AS compile
 
 RUN echo "[timing] Installing system packages: $(date)"
 ARG DEBIAN_FRONTEND=noninteractive
-RUN apt-get -yqq update && \
-    apt-get -yqq install \
+RUN apt-get install \
+      --no-install-recommends \
+      -qqUy \
       autoconf \
+      automake \
       bison \
+      bpftool \
       clang \
       cmake \
       curl \
       flex \
+      gcc \
+      libbpf-dev \
       libbrotli-dev \
       libcap-dev \
       libnuma-dev \
@@ -40,9 +45,7 @@ RUN curl -LSs "https://github.com/h2o/h2o/archive/${H2O_VERSION}.tar.gz" | \
       tar --strip-components=1 -xz && \
     cmake \
       -B build \
-      -DCMAKE_AR=/usr/bin/gcc-ar \
       -DCMAKE_C_FLAGS="-flto=auto -march=native -mtune=native" \
-      -DCMAKE_RANLIB=/usr/bin/gcc-ranlib \
       -DWITH_MRUBY=on \
       -G Ninja \
       -S . && \
@@ -79,8 +82,10 @@ FROM "ubuntu:${UBUNTU_VERSION}"
 
 RUN echo "[timing] Installing final system packages: $(date)"
 ARG DEBIAN_FRONTEND=noninteractive
-RUN apt-get -yqq update && \
-    apt-get -yqq install \
+RUN apt-get install \
+      --no-install-recommends \
+      -qqUy \
+      libbpf1 \
       libnuma1 \
       libpq5 \
       liburing2 \

+ 58 - 0
frameworks/C/h2o/src/bpf/socket_load_balancer.c

@@ -0,0 +1,58 @@
+/*
+ Copyright (c) 2025 Anton Valentinov Kirilov
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
+ associated documentation files (the "Software"), to deal in the Software without restriction,
+ including without limitation the rights to use, copy, modify, merge, publish, distribute,
+ sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in all copies or
+ substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
+ NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+ DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT
+ OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+
+// TODO: Switch to the standard atomics (<stdatomic.h>) after
+// the system header file mess gets sorted for eBPF.
+#include <stdbool.h>
+#include <stddef.h>
+#include <linux/bpf.h>
+#include <bpf/bpf_helpers.h>
+
+// We need a finite number of iterations to keep the eBPF verifier happy.
+#define MAX_ITERATIONS 42
+
+static size_t thread_idx;
+size_t thread_num = 1;
+
+SEC("socket") int socket_load_balancer(void *skb)
+{
+	(void) skb;
+
+	// TODO: Use __atomic_load_n() after LLVM starts supporting it for eBPF.
+	size_t idx = *(const volatile size_t *) &thread_idx;
+	int ret = thread_num;
+
+	__atomic_thread_fence(__ATOMIC_RELAXED);
+
+	for (size_t i = 0; i < MAX_ITERATIONS; i++) {
+		const size_t new_idx = (idx + 1) % thread_num;
+
+		if (__atomic_compare_exchange_n(&thread_idx,
+		                                &idx,
+		                                new_idx,
+		                                false,
+		                                __ATOMIC_RELAXED,
+		                                __ATOMIC_RELAXED)) {
+			ret = idx;
+			break;
+		}
+	}
+
+	return ret;
+}

+ 33 - 10
frameworks/C/h2o/src/event_loop.c

@@ -46,11 +46,16 @@
 
 static void accept_connection(h2o_socket_t *listener, const char *err);
 static void accept_http_connection(h2o_socket_t *listener, const char *err);
-static int get_listener_socket(const char *bind_address, uint16_t port);
+static int get_listener_socket(bool is_main_thread,
+                               int bpf_fd,
+                               const char *bind_address,
+                               uint16_t port);
 static void on_close_connection(void *data);
 static void process_messages(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages);
 static void shutdown_server(h2o_socket_t *listener, const char *err);
-static void start_accept_polling(const config_t *config,
+static void start_accept_polling(bool is_main_thread,
+                                 int bpf_fd,
+                                 const config_t *config,
                                  h2o_socket_cb accept_cb,
                                  bool is_https,
                                  event_loop_t *loop);
@@ -98,7 +103,10 @@ static void accept_http_connection(h2o_socket_t *listener, const char *err)
 	ctx->event_loop.h2o_accept_ctx.ssl_ctx = ssl_ctx;
 }
 
-static int get_listener_socket(const char *bind_address, uint16_t port)
+static int get_listener_socket(bool is_main_thread,
+                               int bpf_fd,
+                               const char *bind_address,
+                               uint16_t port)
 {
 	int ret = -1;
 	char buf[16];
@@ -148,6 +156,15 @@ static int get_listener_socket(const char *bind_address, uint16_t port)
 		LOCAL_CHECK_ERRNO(setsockopt, s, IPPROTO_TCP, TCP_FASTOPEN, &option, sizeof(option));
 		LOCAL_CHECK_ERRNO(bind, s, iter->ai_addr, iter->ai_addrlen);
 		LOCAL_CHECK_ERRNO(listen, s, INT_MAX);
+
+		if (is_main_thread && bpf_fd >= 0)
+			LOCAL_CHECK_ERRNO(setsockopt,
+			                  s,
+			                  SOL_SOCKET,
+			                  SO_ATTACH_REUSEPORT_EBPF,
+			                  &bpf_fd,
+			                  sizeof(bpf_fd));
+
 		ret = s;
 		break;
 
@@ -256,16 +273,17 @@ static void shutdown_server(h2o_socket_t *listener, const char *err)
 	}
 }
 
-static void start_accept_polling(const config_t *config,
+static void start_accept_polling(bool is_main_thread,
+                                 int bpf_fd,
+                                 const config_t *config,
                                  h2o_socket_cb accept_cb,
                                  bool is_https,
                                  event_loop_t *loop)
 {
-	const int listener_sd = get_listener_socket(config->bind_address,
+	const int listener_sd = get_listener_socket(is_main_thread,
+	                                            bpf_fd,
+	                                            config->bind_address,
 	                                            is_https ? config->https_port : config->port);
-	// Let all the threads race to call accept() on the socket; since the latter is
-	// non-blocking, that will virtually act as load balancing, and SO_REUSEPORT
-	// will make it efficient.
 	h2o_socket_t * const h2o_socket = h2o_evloop_socket_create(loop->h2o_ctx.loop,
 	                                                           listener_sd,
 	                                                           H2O_SOCKET_FLAG_DONT_READ);
@@ -345,13 +363,18 @@ void initialize_event_loop(bool is_main_thread,
 
 	if (global_data->ssl_ctx) {
 		loop->h2o_accept_ctx.ssl_ctx = global_data->ssl_ctx;
-		start_accept_polling(config, accept_connection, true, loop);
+		start_accept_polling(is_main_thread,
+		                     global_data->bpf_fd,
+		                     config,
+		                     accept_connection,
+		                     true,
+		                     loop);
 		// Assume that the majority of the connections use HTTPS,
 		// so HTTP can take a few extra operations.
 		accept_cb = accept_http_connection;
 	}
 
-	start_accept_polling(config, accept_cb, false, loop);
+	start_accept_polling(is_main_thread, global_data->bpf_fd, config, accept_cb, false, loop);
 	h2o_multithread_register_receiver(loop->h2o_ctx.queue,
 	                                  h2o_receiver,
 	                                  process_messages);

+ 3 - 0
frameworks/C/h2o/src/global_data.h

@@ -30,6 +30,7 @@
 #include "handlers/request_handler_data.h"
 
 struct global_thread_data_t;
+struct socket_load_balancer;
 struct thread_context_t;
 
 typedef struct config_t {
@@ -55,8 +56,10 @@ typedef struct {
 	h2o_logger_t *file_logger;
 	struct global_thread_data_t *global_thread_data;
 	h2o_socket_t *signals;
+	struct socket_load_balancer *socket_load_balancer;
 	SSL_CTX *ssl_ctx;
 	size_t memory_alignment;
+	int bpf_fd;
 	int signal_fd;
 	h2o_buffer_prototype_t buffer_prototype;
 	h2o_globalconf_t h2o_config;

+ 20 - 0
frameworks/C/h2o/src/main.c

@@ -26,6 +26,8 @@
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
+#include <bpf/bpf.h>
+#include <bpf/libbpf.h>
 #include <h2o/serverutil.h>
 #include <sys/resource.h>
 #include <sys/signalfd.h>
@@ -38,6 +40,7 @@
 #include "global_data.h"
 #include "list.h"
 #include "request_handler.h"
+#include "socket_load_balancer.h"
 #include "thread.h"
 #include "tls.h"
 #include "utility.h"
@@ -81,6 +84,8 @@ static void free_global_data(global_data_t *global_data)
 	if (global_data->file_logger)
 		global_data->file_logger->dispose(global_data->file_logger);
 
+	close(global_data->bpf_fd);
+	socket_load_balancer__destroy(global_data->socket_load_balancer);
 	cleanup_request_handlers(&global_data->request_handler_data);
 	h2o_config_dispose(&global_data->h2o_config);
 
@@ -93,6 +98,7 @@ static int initialize_global_data(const config_t *config, global_data_t *global_
 	sigset_t signals;
 
 	memset(global_data, 0, sizeof(*global_data));
+	global_data->bpf_fd = -1;
 	global_data->buffer_prototype._initial_buf.capacity = H2O_SOCKET_INITIAL_INPUT_BUFFER_SIZE;
 	global_data->memory_alignment = get_maximum_cache_line_size();
 	CHECK_ERRNO(sigemptyset, &signals);
@@ -134,6 +140,20 @@ static int initialize_global_data(const config_t *config, global_data_t *global_
 			global_data->file_logger = h2o_access_log_register(pathconf, log_handle);
 	}
 
+	global_data->socket_load_balancer = socket_load_balancer__open();
+
+	if (global_data->socket_load_balancer) {
+		global_data->socket_load_balancer->data->thread_num = config->thread_num;
+
+		if (socket_load_balancer__load(global_data->socket_load_balancer)) {
+			socket_load_balancer__destroy(global_data->socket_load_balancer);
+			global_data->socket_load_balancer = NULL;
+		}
+		else
+			global_data->bpf_fd =
+				bpf_program__fd(global_data->socket_load_balancer->progs.socket_load_balancer);
+	}
+
 	global_data->global_thread_data = initialize_global_thread_data(config, global_data);
 
 	if (global_data->global_thread_data) {