Browse Source

H2O: Improve load balancing and other enhancements (#2607)

* H2O: Make several improvements

* Add timestamps to error messages.
* Make some defensive changes against integer overflow.
* Use a fixed number for the maximum number of events that may be
returned by epoll_wait() instead of one that depends on the number
of database connections, since write polling may not be used only
by the database code.

* H2O: Set the SO_REUSEPORT socket option on listening sockets

SO_REUSEPORT improves load balancing, especially when the
application server has a large number of CPU cores and/or
there are a lot of incoming connections.
Anton Kirilov 8 years ago
parent
commit
a2b18227d6

+ 4 - 1
frameworks/C/h2o/src/database.c

@@ -161,6 +161,7 @@ static void on_database_connect_timeout(h2o_timeout_entry_t *entry)
 {
 	db_conn_t * const db_conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, h2o_timeout_entry, entry);
 
+	ERROR(DB_TIMEOUT_ERROR);
 	on_database_connect_error(db_conn, true, DB_TIMEOUT_ERROR);
 }
 
@@ -249,6 +250,8 @@ static void on_database_timeout(h2o_timeout_entry_t *entry)
 {
 	db_conn_t * const db_conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, h2o_timeout_entry, entry);
 
+	ERROR(DB_TIMEOUT_ERROR);
+
 	if (db_conn->param) {
 		db_conn->param->on_timeout(db_conn->param);
 		db_conn->param = NULL;
@@ -465,7 +468,7 @@ static void stop_database_write_polling(db_conn_t *db_conn)
 
 void connect_to_database(thread_context_t *ctx)
 {
-	for (size_t i = ctx->db_state.db_conn_num; i < ctx->config->max_db_conn_num; i++)
+	for (size_t i = ctx->config->max_db_conn_num - ctx->db_state.db_conn_num; i > 0; i--)
 		start_database_connect(ctx, NULL);
 }
 

+ 12 - 1
frameworks/C/h2o/src/error.c

@@ -22,6 +22,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <time.h>
 #include <unistd.h>
 #include <sys/syscall.h>
 
@@ -34,12 +35,22 @@ void print_error(const char *file,
                  ...)
 {
 	char * const file_name = strdup(file);
+	const long tid = syscall(SYS_gettid);
+	struct timespec t = {.tv_sec = 0};
 
 	if (file_name)
 		file = basename(file_name);
 
+	clock_gettime(CLOCK_REALTIME, &t);
 	flockfile(stderr);
-	fprintf(stderr, "[%d] %s: %u: %s(): ", (int) syscall(SYS_gettid), file, line, function);
+	fprintf(stderr,
+	        "%010lld.%09ld [%ld] %s: %u: %s(): ",
+	        (long long) t.tv_sec,
+	        t.tv_nsec,
+	        tid,
+	        file,
+	        line,
+	        function);
 
 	va_list arg;
 

+ 109 - 34
frameworks/C/h2o/src/event_loop.c

@@ -18,29 +18,42 @@
 */
 
 #include <errno.h>
-#include <fcntl.h>
 #include <h2o.h>
+#include <inttypes.h>
+#include <limits.h>
+#include <netdb.h>
+#include <stdarg.h>
 #include <stdbool.h>
 #include <stdint.h>
+#include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <openssl/ssl.h>
 #include <sys/epoll.h>
+#include <sys/socket.h>
+#include <sys/types.h>
 
 #include "error.h"
 #include "event_loop.h"
 #include "thread.h"
 #include "utility.h"
 
+#define DEFAULT_TCP_FASTOPEN_QUEUE_LEN 4096
+#define MAX_EPOLL_EVENTS 64
+
 static void accept_connection(h2o_socket_t *listener, const char *err);
 static void accept_http_connection(h2o_socket_t *listener, const char *err);
 static void do_epoll_wait(h2o_socket_t *epoll_sock, const char *err);
+static int get_listener_socket(const char *bind_address, uint16_t port);
 static void on_close_connection(void *data);
 static void process_messages(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages);
 static void shutdown_server(h2o_socket_t *listener, const char *err);
-static void start_accept_polling(bool is_main_thread,
+static void start_accept_polling(const config_t *config,
+                                 h2o_socket_cb accept_cb,
                                  bool is_https,
-                                 global_data_t *global_data,
                                  event_loop_t *loop);
 
 static void accept_connection(h2o_socket_t *listener, const char *err)
@@ -53,7 +66,7 @@ static void accept_connection(h2o_socket_t *listener, const char *err)
 		                                                      listener->data);
 
 		if (!ctx->global_data->shutdown) {
-			size_t accepted = 0;
+			size_t accepted = ctx->config->max_accept;
 
 			do {
 				h2o_socket_t * const sock = h2o_evloop_socket_accept(listener);
@@ -65,7 +78,7 @@ static void accept_connection(h2o_socket_t *listener, const char *err)
 				sock->on_close.cb = on_close_connection;
 				sock->on_close.data = &ctx->event_loop.conn_num;
 				h2o_accept(&ctx->event_loop.h2o_accept_ctx, sock);
-			} while (++accepted < ctx->config->max_accept);
+			} while (--accepted > 0);
 		}
 	}
 }
@@ -91,23 +104,93 @@ static void do_epoll_wait(h2o_socket_t *epoll_sock, const char *err)
 		thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
 		                                                      event_loop,
 		                                                      epoll_sock->data);
-		const size_t num_event = ctx->db_state.db_conn_num - ctx->db_state.free_db_conn_num;
 		int ready;
-		struct epoll_event event[num_event];
+		struct epoll_event event[MAX_EPOLL_EVENTS];
 
 		do
-			ready = epoll_wait(ctx->event_loop.epoll_fd, event, num_event, 0);
+			ready = epoll_wait(ctx->event_loop.epoll_fd, event, MAX_EPOLL_EVENTS, 0);
 		while (ready < 0 && errno == EINTR);
 
 		if (ready > 0)
-			for (size_t i = 0; i < (size_t) ready; i++) {
-				void (** const on_write_ready)(void *) = event[i].data.ptr;
+			do {
+				void (** const on_write_ready)(void *) = event[--ready].data.ptr;
 
 				(*on_write_ready)(on_write_ready);
-			}
+			} while (ready > 0);
+		else if (ready < 0)
+			STANDARD_ERROR("epoll_wait");
 	}
 }
 
+static int get_listener_socket(const char *bind_address, uint16_t port)
+{
+	int ret = -1;
+	char buf[16];
+
+	if ((size_t) snprintf(buf, sizeof(buf), "%" PRIu16, port) >= sizeof(buf)) {
+		LIBRARY_ERROR("snprintf", "Truncated output.");
+		return ret;
+	}
+
+	struct addrinfo *res = NULL;
+	struct addrinfo hints = {.ai_socktype = SOCK_STREAM, .ai_flags = AI_PASSIVE};
+	const int error_code = getaddrinfo(bind_address, buf, &hints, &res);
+
+	if (error_code) {
+		LIBRARY_ERROR("getaddrinfo", gai_strerror(error_code));
+		return ret;
+	}
+
+	struct addrinfo *iter = res;
+
+	for (; iter; iter = iter->ai_next) {
+		const int s = socket(iter->ai_family,
+		                     iter->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC,
+		                     iter->ai_protocol);
+
+		if (s == -1) {
+			STANDARD_ERROR("socket");
+			continue;
+		}
+
+#define LOCAL_CHECK_ERRNO(function, ...) \
+	do { \
+		const int error_code = (function)(__VA_ARGS__); \
+		\
+		if (error_code) { \
+			print_library_error(__FILE__, __LINE__, #function, errno); \
+			goto error; \
+		} \
+	} while(0)
+
+		int option = 1;
+
+		LOCAL_CHECK_ERRNO(setsockopt, s, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option));
+		LOCAL_CHECK_ERRNO(setsockopt, s, SOL_SOCKET, SO_REUSEPORT, &option, sizeof(option));
+		LOCAL_CHECK_ERRNO(setsockopt, s, IPPROTO_TCP, TCP_QUICKACK, &option, sizeof(option));
+		option = H2O_DEFAULT_HANDSHAKE_TIMEOUT_IN_SECS;
+		LOCAL_CHECK_ERRNO(setsockopt, s, IPPROTO_TCP, TCP_DEFER_ACCEPT, &option, sizeof(option));
+		option = DEFAULT_TCP_FASTOPEN_QUEUE_LEN;
+		LOCAL_CHECK_ERRNO(setsockopt, s, IPPROTO_TCP, TCP_FASTOPEN, &option, sizeof(option));
+		LOCAL_CHECK_ERRNO(bind, s, iter->ai_addr, iter->ai_addrlen);
+		LOCAL_CHECK_ERRNO(listen, s, INT_MAX);
+		ret = s;
+		break;
+
+#undef LOCAL_CHECK_ERRNO
+
+error:
+		close(s);
+	}
+
+	freeaddrinfo(res);
+
+	if (ret == -1)
+		abort();
+
+	return ret;
+}
+
 static void on_close_connection(void *data)
 {
 	size_t * const conn_num = data;
@@ -138,28 +221,21 @@ static void shutdown_server(h2o_socket_t *listener, const char *err)
 		ctx->global_data->shutdown = true;
 		h2o_socket_read_stop(ctx->event_loop.h2o_socket);
 
-		for (size_t i = 1; i < ctx->config->thread_num; i++)
+		for (size_t i = ctx->config->thread_num - 1; i > 0; i--)
 			h2o_multithread_send_message(&ctx->global_thread_data[i].h2o_receiver, NULL);
 	}
 }
 
-static void start_accept_polling(bool is_main_thread,
+static void start_accept_polling(const config_t *config,
+                                 h2o_socket_cb accept_cb,
                                  bool is_https,
-                                 global_data_t *global_data,
                                  event_loop_t *loop)
 {
-	int listener_sd = is_https ? global_data->https_listener_sd : global_data->listener_sd;
-
-	if (!is_main_thread) {
-		int flags;
-
-		CHECK_ERRNO_RETURN(listener_sd, dup, listener_sd);
-		CHECK_ERRNO_RETURN(flags, fcntl, listener_sd, F_GETFD);
-		CHECK_ERRNO(fcntl, listener_sd, F_SETFD, flags | FD_CLOEXEC);
-	}
-
+	const int listener_sd = get_listener_socket(config->bind_address,
+	                                            is_https ? config->https_port : config->port);
 	// Let all the threads race to call accept() on the socket; since the latter is
-	// non-blocking, that will effectively act as load balancing.
+	// non-blocking, that will virtually act as load balancing, and SO_REUSEPORT
+	// will make it efficient.
 	h2o_socket_t * const h2o_socket = h2o_evloop_socket_create(loop->h2o_ctx.loop,
 	                                                           listener_sd,
 	                                                           H2O_SOCKET_FLAG_DONT_READ);
@@ -170,13 +246,6 @@ static void start_accept_polling(bool is_main_thread,
 		loop->h2o_socket = h2o_socket;
 
 	h2o_socket->data = loop;
-
-	// Assume that the majority of the connections use HTTPS, so HTTP can take a few
-	// extra operations.
-	const h2o_socket_cb accept_cb = !global_data->ssl_ctx || is_https ?
-	                                accept_connection :
-	                                accept_http_connection;
-
 	h2o_socket_read_start(h2o_socket, accept_cb);
 }
 
@@ -202,6 +271,9 @@ void initialize_event_loop(bool is_main_thread,
                            h2o_multithread_receiver_t *h2o_receiver,
                            event_loop_t *loop)
 {
+	h2o_socket_cb accept_cb = accept_connection;
+	const config_t * const config = global_data->global_thread_data->config;
+
 	memset(loop, 0, sizeof(*loop));
 	h2o_context_init(&loop->h2o_ctx, h2o_evloop_create(), &global_data->h2o_config);
 	loop->h2o_accept_ctx.ctx = &loop->h2o_ctx;
@@ -209,10 +281,13 @@ void initialize_event_loop(bool is_main_thread,
 
 	if (global_data->ssl_ctx) {
 		loop->h2o_accept_ctx.ssl_ctx = global_data->ssl_ctx;
-		start_accept_polling(is_main_thread, true, global_data, loop);
+		start_accept_polling(config, accept_connection, true, loop);
+		// Assume that the majority of the connections use HTTPS,
+		// so HTTP can take a few extra operations.
+		accept_cb = accept_http_connection;
 	}
 
-	start_accept_polling(is_main_thread, false, global_data, loop);
+	start_accept_polling(config, accept_cb, false, loop);
 	h2o_multithread_register_receiver(loop->h2o_ctx.queue,
 	                                  h2o_receiver,
 	                                  process_messages);

+ 3 - 87
frameworks/C/h2o/src/main.c

@@ -17,26 +17,19 @@
  OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */
 
-#include <assert.h>
 #include <errno.h>
 #include <h2o.h>
 #include <mustache.h>
-#include <netdb.h>
+#include <pthread.h>
 #include <signal.h>
-#include <stdarg.h>
-#include <stdbool.h>
-#include <stdint.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
 #include <h2o/serverutil.h>
-#include <netinet/tcp.h>
 #include <sys/resource.h>
 #include <sys/signalfd.h>
-#include <sys/socket.h>
 #include <sys/time.h>
-#include <sys/types.h>
 
 #include "error.h"
 #include "event_loop.h"
@@ -47,7 +40,6 @@
 #include "utility.h"
 
 #define DEFAULT_CACHE_LINE_SIZE 128
-#define DEFAULT_TCP_FASTOPEN_QUEUE_LEN 4096
 #define USAGE_MESSAGE \
 	"Usage:\n%s [-a <max connections accepted simultaneously>] [-b <bind address>] " \
 	"[-c <certificate file>] [-d <database connection string>] [-f fortunes template file path] " \
@@ -57,7 +49,6 @@
 	"[-s <HTTPS port>] [-t <thread number>]\n"
 
 static void free_global_data(global_data_t *global_data);
-static int get_listener_socket(const char *bind_address, uint16_t port);
 static size_t get_maximum_cache_line_size(void);
 static int initialize_global_data(const config_t *config, global_data_t *global_data);
 static int parse_options(int argc, char *argv[], config_t *config);
@@ -67,7 +58,7 @@ static void setup_process(void);
 static void free_global_data(global_data_t *global_data)
 {
 	if (global_data->global_thread_data) {
-		for (size_t i = 1; i < global_data->global_thread_data->config->thread_num; i++)
+		for (size_t i = global_data->global_thread_data->config->thread_num - 1; i > 0; i--)
 			CHECK_ERROR(pthread_join, global_data->global_thread_data[i].thread, NULL);
 
 		free(global_data->global_thread_data);
@@ -88,70 +79,6 @@ static void free_global_data(global_data_t *global_data)
 		cleanup_openssl(global_data);
 }
 
-static int get_listener_socket(const char *bind_address, uint16_t port)
-{
-	int ret = -1;
-	char buf[16];
-
-	if ((size_t) snprintf(buf, sizeof(buf), "%" PRIu16, port) >= sizeof(buf)) {
-		LIBRARY_ERROR("snprintf", "Truncated output.");
-		return ret;
-	}
-
-	struct addrinfo *res = NULL;
-	struct addrinfo hints = {.ai_socktype = SOCK_STREAM, .ai_flags = AI_PASSIVE};
-	const int error_code = getaddrinfo(bind_address, buf, &hints, &res);
-
-	if (error_code) {
-		LIBRARY_ERROR("getaddrinfo", gai_strerror(error_code));
-		return ret;
-	}
-
-	struct addrinfo *iter = res;
-
-	for (; iter; iter = iter->ai_next) {
-		const int s = socket(iter->ai_family,
-		                     iter->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC,
-		                     iter->ai_protocol);
-
-		if (s == -1) {
-			STANDARD_ERROR("socket");
-			continue;
-		}
-
-#define LOCAL_CHECK_ERRNO(function, ...) \
-	do { \
-		const int error_code = (function)(__VA_ARGS__); \
-		\
-		if (error_code) { \
-			print_library_error(__FILE__, __LINE__, #function, errno); \
-			goto error; \
-		} \
-	} while(0)
-
-		int option = 1;
-
-		LOCAL_CHECK_ERRNO(setsockopt, s, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option));
-		LOCAL_CHECK_ERRNO(setsockopt, s, IPPROTO_TCP, TCP_QUICKACK, &option, sizeof(option));
-		option = H2O_DEFAULT_HANDSHAKE_TIMEOUT_IN_SECS;
-		LOCAL_CHECK_ERRNO(setsockopt, s, IPPROTO_TCP, TCP_DEFER_ACCEPT, &option, sizeof(option));
-		option = DEFAULT_TCP_FASTOPEN_QUEUE_LEN;
-		LOCAL_CHECK_ERRNO(setsockopt, s, IPPROTO_TCP, TCP_FASTOPEN, &option, sizeof(option));
-		LOCAL_CHECK_ERRNO(bind, s, iter->ai_addr, iter->ai_addrlen);
-		LOCAL_CHECK_ERRNO(listen, s, INT_MAX);
-		ret = s;
-		break;
-
-#undef LOCAL_CHECK_ERRNO
-
-error:
-		close(s);
-	}
-
-	freeaddrinfo(res);
-	return ret;
-}
-
 static size_t get_maximum_cache_line_size(void)
 {
 	const int name[] = {_SC_LEVEL1_DCACHE_LINESIZE,
@@ -193,20 +120,9 @@ static int initialize_global_data(const config_t *config, global_data_t *global_
 	CHECK_ERRNO_RETURN(global_data->signal_fd, signalfd, -1, &signals, SFD_NONBLOCK | SFD_CLOEXEC);
 	global_data->fortunes_template = get_fortunes_template(config->template_path);
 	h2o_config_init(&global_data->h2o_config);
-	global_data->listener_sd = get_listener_socket(config->bind_address, config->port);
-
-	if (global_data->listener_sd == -1)
-		goto error;
-
-	if (config->cert && config->key) {
-		global_data->https_listener_sd = get_listener_socket(config->bind_address,
-		                                                     config->https_port);
-
-		if (global_data->https_listener_sd == -1)
-			goto error;
 
+	if (config->cert && config->key)
 		initialize_openssl(config, global_data);
-	}
 
 	const h2o_iovec_t host = h2o_iovec_init(H2O_STRLIT("default"));
 	h2o_hostconf_t * const hostconf = h2o_config_register_host(&global_data->h2o_config,

+ 1 - 1
frameworks/C/h2o/src/thread.c

@@ -98,7 +98,7 @@ void start_threads(global_thread_data_t *global_thread_data)
 	// The first thread context is used by the main thread.
 	global_thread_data->thread = pthread_self();
 
-	for (size_t i = 1; i < global_thread_data->config->thread_num; i++)
+	for (size_t i = global_thread_data->config->thread_num - 1; i > 0; i--)
 		CHECK_ERROR(pthread_create,
 		            &global_thread_data[i].thread,
 		            NULL,

+ 1 - 1
frameworks/C/h2o/src/tls.c

@@ -146,7 +146,7 @@ void initialize_openssl(const config_t *config, global_data_t *global_data)
 	SSL_library_init();
 	SSL_load_error_strings();
 	openssl_global_data.num_lock = CRYPTO_num_locks();
-	openssl_global_data.lock = malloc(openssl_global_data.num_lock *
+	openssl_global_data.lock = calloc(openssl_global_data.num_lock,
 	                                  sizeof(*openssl_global_data.lock));
 	CHECK_ERROR(pthread_mutexattr_init, &openssl_global_data.lock_attr);
 	CHECK_ERROR(pthread_mutexattr_settype,

+ 20 - 7
frameworks/C/h2o/src/utility.c

@@ -19,6 +19,7 @@
 
 #include <assert.h>
 #include <h2o.h>
+#include <limits.h>
 #include <stdalign.h>
 #include <stdint.h>
 #include <stdlib.h>
@@ -27,6 +28,8 @@
 
 #include "utility.h"
 
+#define HEADER_SIZE (MAX(sizeof(intmax_t), sizeof(void(*)(void))))
+
 static void mem_pool_free(void *ctx, void *ptr);
 static void *mem_pool_malloc(void *ctx, size_t sz);
 static void *mem_pool_realloc(void *ctx, void *ptr, size_t sz);
@@ -40,12 +43,18 @@ static void mem_pool_free(void *ctx, void *ptr)
 
 static void *mem_pool_malloc(void *ctx, size_t sz)
 {
-	size_t * const p = h2o_mem_alloc_pool(ctx, sz + sizeof(*p));
-	void * const ret = p + 1;
+	void *ret = NULL;
+
+	if (SIZE_MAX - sz >= HEADER_SIZE) {
+		size_t * const p = h2o_mem_alloc_pool(ctx, sz + HEADER_SIZE);
+
+		*p = sz;
+		ret = (char *) p + HEADER_SIZE;
+		// check alignment
+		assert(!(((uintptr_t) ret) & (alignof(intmax_t) - 1)));
+		assert(!(((uintptr_t) ret) & (alignof(void(*)(void)) - 1)));
+	}
 
-	*p = sz;
-	// check alignment
-	assert(!(((uintptr_t) ret) & (alignof(void *) - 1)));
 	return ret;
 }
 
@@ -54,11 +63,13 @@ static void *mem_pool_realloc(void *ctx, void *ptr, size_t sz)
 	void *ret;
 
 	if (ptr) {
-		const size_t old_sz = ((const size_t *) ptr)[-1];
+		const size_t old_sz = *(const size_t *)((const char *) ptr - HEADER_SIZE);
 
 		if (sz > old_sz) {
 			ret = mem_pool_malloc(ctx, sz);
-			memcpy(ret, ptr, old_sz);
+
+			if (ret)
+				memcpy(ret, ptr, old_sz);
 		}
 		else
 			ret = ptr;
@@ -81,6 +92,8 @@ yajl_gen get_json_generator(h2o_mem_pool_t *pool)
 
 uint32_t get_random_number(uint32_t max_rand, unsigned int *seed)
 {
+	assert(max_rand <= (uint32_t) RAND_MAX);
+
 	// In general, RAND_MAX + 1 is not a multiple of max_rand,
 	// so rand_r() % max_rand would be biased.
 	const unsigned bucket_size = (RAND_MAX + 1U) / max_rand;

+ 2 - 2
frameworks/C/h2o/src/utility.h

@@ -31,6 +31,8 @@
 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(*(a)))
 // mainly used to silence compiler warnings about unused function parameters
 #define IGNORE_FUNCTION_PARAMETER(p) ((void) (p))
+// Do not use the following MAX and MIN macros with parameters that have side effects.
+#define MAX(x, y) ((x) > (y) ? (x) : (y))
 #define MIN(x, y) ((x) < (y) ? (x) : (y))
 #define MKSTR(x) TOSTRING(x)
 #define TOSTRING(x) # x
@@ -61,8 +63,6 @@ typedef struct {
 	SSL_CTX *ssl_ctx;
 	global_thread_data_t *global_thread_data;
 	size_t memory_alignment;
-	int https_listener_sd;
-	int listener_sd;
 	int signal_fd;
 	bool shutdown;
 	h2o_globalconf_t h2o_config;

+ 10 - 0
frameworks/C/h2o/src/world.c

@@ -473,6 +473,11 @@ int multiple_queries(struct st_h2o_handler_t *self, h2o_req_t *req)
 	                                                      req->conn->ctx);
 
 	const size_t num_query = get_query_number(req);
+
+	// MAX_QUERIES is a relatively small number, so assume no overflow in the following
+	// arithmetic operations.
+	assert(num_query <= MAX_QUERIES);
+
 	size_t base_size = offsetof(multiple_query_ctx_t, res) + num_query * sizeof(query_result_t);
 
 	base_size = ((base_size + _Alignof(query_param_t) - 1) / _Alignof(query_param_t));
@@ -578,6 +583,11 @@ int updates(struct st_h2o_handler_t *self, h2o_req_t *req)
 	                                                      req->conn->ctx);
 
 	const size_t num_query = get_query_number(req);
+
+	// MAX_QUERIES is a relatively small number, so assume no overflow in the following
+	// arithmetic operations.
+	assert(num_query <= MAX_QUERIES);
+
 	size_t base_size = offsetof(update_ctx_t, res) + num_query * sizeof(query_result_t);
 
 	base_size = ((base_size + _Alignof(update_param_t) - 1) / _Alignof(update_param_t));