123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- /*
- Copyright (c) 2016 Anton Valentinov Kirilov
- Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
- associated documentation files (the "Software"), to deal in the Software without restriction,
- including without limitation the rights to use, copy, modify, merge, publish, distribute,
- sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
- furnished to do so, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in all copies or
- substantial portions of the Software.
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
- NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
- DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT
- OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- */
- #include <errno.h>
- #include <h2o.h>
- #include <inttypes.h>
- #include <limits.h>
- #include <netdb.h>
- #include <stdbool.h>
- #include <stdint.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <unistd.h>
- #include <netinet/in.h>
- #include <netinet/tcp.h>
- #include <openssl/ssl.h>
- #include <sys/socket.h>
- #include <sys/syscall.h>
- #include <sys/types.h>
- #include "error.h"
- #include "event_loop.h"
- #include "global_data.h"
- #include "thread.h"
- #include "utility.h"
- #define CONN_NUM_SAMPLE_PERIOD 2500
- #define DEFAULT_TCP_FASTOPEN_QUEUE_LEN 4096
- static void accept_connection(h2o_socket_t *listener, const char *err);
- static void accept_http_connection(h2o_socket_t *listener, const char *err);
- static int get_listener_socket(const char *bind_address, uint16_t port);
- static void on_close_connection(void *data);
- static void process_messages(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages);
- static void shutdown_server(h2o_socket_t *listener, const char *err);
- static void start_accept_polling(const config_t *config,
- h2o_socket_cb accept_cb,
- bool is_https,
- event_loop_t *loop);
- static void accept_connection(h2o_socket_t *listener, const char *err)
- {
- if (err)
- ERROR(err);
- else {
- thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
- event_loop,
- listener->data);
- if (!ctx->shutdown) {
- size_t accepted = ctx->global_thread_data->config->max_accept;
- assert(accepted);
- do {
- h2o_socket_t * const sock = h2o_evloop_socket_accept(listener);
- if (!sock)
- break;
- ctx->event_loop.accepted_conn_num++;
- ctx->event_loop.conn_num++;
- sock->on_close.cb = on_close_connection;
- sock->on_close.data = &ctx->event_loop.conn_num;
- h2o_accept(&ctx->event_loop.h2o_accept_ctx, sock);
- } while (--accepted > 0);
- }
- }
- }
- static void accept_http_connection(h2o_socket_t *listener, const char *err)
- {
- // Assume that err is most often NULL.
- thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
- event_loop,
- listener->data);
- SSL_CTX * const ssl_ctx = ctx->event_loop.h2o_accept_ctx.ssl_ctx;
- ctx->event_loop.h2o_accept_ctx.ssl_ctx = NULL;
- accept_connection(listener, err);
- ctx->event_loop.h2o_accept_ctx.ssl_ctx = ssl_ctx;
- }
- static int get_listener_socket(const char *bind_address, uint16_t port)
- {
- int ret = -1;
- char buf[16];
- if ((size_t) snprintf(buf, sizeof(buf), "%" PRIu16, port) >= sizeof(buf)) {
- LIBRARY_ERROR("snprintf", "Truncated output.");
- return ret;
- }
- struct addrinfo *res = NULL;
- struct addrinfo hints = {.ai_socktype = SOCK_STREAM, .ai_flags = AI_PASSIVE};
- const int error_code = getaddrinfo(bind_address, buf, &hints, &res);
- if (error_code) {
- LIBRARY_ERROR("getaddrinfo", gai_strerror(error_code));
- return ret;
- }
- for (const struct addrinfo *iter = res; iter; iter = iter->ai_next) {
- const int s = socket(iter->ai_family,
- iter->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC,
- iter->ai_protocol);
- if (s == -1) {
- STANDARD_ERROR("socket");
- continue;
- }
- #define LOCAL_CHECK_ERRNO(function, ...) \
- do { \
- const int error_code = (function)(__VA_ARGS__); \
- \
- if (error_code) { \
- print_library_error(__FILE__, __LINE__, #function, errno); \
- goto error; \
- } \
- } while(0)
- int option = 1;
- LOCAL_CHECK_ERRNO(setsockopt, s, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option));
- LOCAL_CHECK_ERRNO(setsockopt, s, SOL_SOCKET, SO_REUSEPORT, &option, sizeof(option));
- LOCAL_CHECK_ERRNO(setsockopt, s, IPPROTO_TCP, TCP_QUICKACK, &option, sizeof(option));
- option = H2O_DEFAULT_HANDSHAKE_TIMEOUT_IN_SECS;
- LOCAL_CHECK_ERRNO(setsockopt, s, IPPROTO_TCP, TCP_DEFER_ACCEPT, &option, sizeof(option));
- option = DEFAULT_TCP_FASTOPEN_QUEUE_LEN;
- LOCAL_CHECK_ERRNO(setsockopt, s, IPPROTO_TCP, TCP_FASTOPEN, &option, sizeof(option));
- LOCAL_CHECK_ERRNO(bind, s, iter->ai_addr, iter->ai_addrlen);
- LOCAL_CHECK_ERRNO(listen, s, INT_MAX);
- ret = s;
- break;
- #undef LOCAL_CHECK_ERRNO
- error:
- close(s);
- }
- freeaddrinfo(res);
- if (ret == -1)
- abort();
- return ret;
- }
- static void on_close_connection(void *data)
- {
- size_t * const conn_num = data;
- (*conn_num)--;
- }
- static void process_messages(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages)
- {
- global_thread_data_t * const global_thread_data = H2O_STRUCT_FROM_MEMBER(global_thread_data_t,
- h2o_receiver,
- receiver);
- while (!h2o_linklist_is_empty(messages)) {
- message_t * const msg = H2O_STRUCT_FROM_MEMBER(message_t, super, messages->next);
- h2o_linklist_unlink(&msg->super.link);
- switch (msg->type) {
- case SHUTDOWN:
- // Close the listening sockets immediately, so that if another instance of
- // the application is started before the current one exits (e.g. when doing
- // an update), it will accept all incoming connections.
- if (global_thread_data->ctx->event_loop.h2o_https_socket) {
- h2o_socket_read_stop(global_thread_data->ctx->event_loop.h2o_https_socket);
- h2o_socket_close(global_thread_data->ctx->event_loop.h2o_https_socket);
- global_thread_data->ctx->event_loop.h2o_https_socket = NULL;
- }
- if (global_thread_data->ctx->event_loop.h2o_socket) {
- h2o_socket_read_stop(global_thread_data->ctx->event_loop.h2o_socket);
- h2o_socket_close(global_thread_data->ctx->event_loop.h2o_socket);
- global_thread_data->ctx->event_loop.h2o_socket = NULL;
- }
- global_thread_data->ctx->shutdown = true;
- break;
- case TASK:
- {
- task_message_t * const task = H2O_STRUCT_FROM_MEMBER(task_message_t, super, msg);
- task->task(task->arg);
- break;
- }
- default:
- break;
- }
- free(msg);
- }
- }
- static void shutdown_server(h2o_socket_t *listener, const char *err)
- {
- if (err)
- ERROR(err);
- else {
- thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
- event_loop,
- listener->data);
- ctx->shutdown = true;
- // Close the listening sockets immediately, so that if another instance
- // of the application is started before the current one exits (e.g. when
- // doing an update), it will accept all incoming connections.
- if (ctx->event_loop.h2o_https_socket) {
- h2o_socket_read_stop(ctx->event_loop.h2o_https_socket);
- h2o_socket_close(ctx->event_loop.h2o_https_socket);
- ctx->event_loop.h2o_https_socket = NULL;
- }
- if (ctx->event_loop.h2o_socket) {
- h2o_socket_read_stop(ctx->event_loop.h2o_socket);
- h2o_socket_close(ctx->event_loop.h2o_socket);
- ctx->event_loop.h2o_socket = NULL;
- }
- global_thread_data_t * const global_thread_data =
- ctx->global_thread_data->global_data->global_thread_data;
- for (size_t i = ctx->global_thread_data->config->thread_num - 1; i > 0; i--) {
- message_t * const msg = h2o_mem_alloc(sizeof(*msg));
- memset(msg, 0, sizeof(*msg));
- msg->type = SHUTDOWN;
- send_message(msg, &global_thread_data[i].h2o_receiver);
- }
- }
- }
- static void start_accept_polling(const config_t *config,
- h2o_socket_cb accept_cb,
- bool is_https,
- event_loop_t *loop)
- {
- const int listener_sd = get_listener_socket(config->bind_address,
- is_https ? config->https_port : config->port);
- // Let all the threads race to call accept() on the socket; since the latter is
- // non-blocking, that will virtually act as load balancing, and SO_REUSEPORT
- // will make it efficient.
- h2o_socket_t * const h2o_socket = h2o_evloop_socket_create(loop->h2o_ctx.loop,
- listener_sd,
- H2O_SOCKET_FLAG_DONT_READ);
- if (is_https)
- loop->h2o_https_socket = h2o_socket;
- else
- loop->h2o_socket = h2o_socket;
- h2o_socket->data = loop;
- h2o_socket_read_start(h2o_socket, accept_cb);
- }
- void event_loop(struct thread_context_t *ctx)
- {
- uint64_t last_sample = 0;
- while (!ctx->shutdown || ctx->event_loop.conn_num) {
- h2o_evloop_run(ctx->event_loop.h2o_ctx.loop, INT32_MAX);
- process_messages(&ctx->global_thread_data->h2o_receiver,
- &ctx->event_loop.local_messages);
- const uint64_t now = h2o_now(ctx->event_loop.h2o_ctx.loop);
- if (now - last_sample > CONN_NUM_SAMPLE_PERIOD || last_sample > now) {
- const size_t i = ctx->event_loop.conn_num_sample_idx;
- ctx->event_loop.conn_num_sample[i] = ctx->event_loop.conn_num;
- ctx->event_loop.conn_num_sample_idx =
- (i + 1) % ARRAY_SIZE(ctx->event_loop.conn_num_sample);
- last_sample = now;
- }
- }
- flockfile(stdout);
- printf("Thread %ld statistics:\nAccepted connections: %zu\nConnection number samples: %zu",
- syscall(SYS_gettid),
- ctx->event_loop.accepted_conn_num,
- *ctx->event_loop.conn_num_sample);
- for (size_t i = 1; i < ARRAY_SIZE(ctx->event_loop.conn_num_sample); i++)
- printf(",%zu", ctx->event_loop.conn_num_sample[i]);
- putc_unlocked('\n', stdout);
- funlockfile(stdout);
- }
- void free_event_loop(event_loop_t *event_loop, h2o_multithread_receiver_t *h2o_receiver)
- {
- if (event_loop->h2o_https_socket)
- h2o_socket_close(event_loop->h2o_https_socket);
- if (event_loop->h2o_socket)
- h2o_socket_close(event_loop->h2o_socket);
- h2o_multithread_unregister_receiver(event_loop->h2o_ctx.queue, h2o_receiver);
- h2o_loop_t * const loop = event_loop->h2o_ctx.loop;
- h2o_context_dispose(&event_loop->h2o_ctx);
- h2o_evloop_destroy(loop);
- }
- void initialize_event_loop(bool is_main_thread,
- global_data_t *global_data,
- h2o_multithread_receiver_t *h2o_receiver,
- event_loop_t *loop)
- {
- h2o_socket_cb accept_cb = accept_connection;
- const config_t * const config = global_data->global_thread_data->config;
- memset(loop, 0, sizeof(*loop));
- h2o_context_init(&loop->h2o_ctx, h2o_evloop_create(), &global_data->h2o_config);
- loop->h2o_accept_ctx.ctx = &loop->h2o_ctx;
- loop->h2o_accept_ctx.hosts = global_data->h2o_config.hosts;
- h2o_linklist_init_anchor(&loop->local_messages);
- if (global_data->ssl_ctx) {
- loop->h2o_accept_ctx.ssl_ctx = global_data->ssl_ctx;
- start_accept_polling(config, accept_connection, true, loop);
- // Assume that the majority of the connections use HTTPS,
- // so HTTP can take a few extra operations.
- accept_cb = accept_http_connection;
- }
- start_accept_polling(config, accept_cb, false, loop);
- h2o_multithread_register_receiver(loop->h2o_ctx.queue,
- h2o_receiver,
- process_messages);
- if (is_main_thread) {
- global_data->signals = h2o_evloop_socket_create(loop->h2o_ctx.loop,
- global_data->signal_fd,
- H2O_SOCKET_FLAG_DONT_READ);
- global_data->signals->data = loop;
- h2o_socket_read_start(global_data->signals, shutdown_server);
- }
- }
- void send_local_message(message_t *msg, h2o_linklist_t *local_messages)
- {
- h2o_linklist_insert(local_messages, &msg->super.link);
- }
- void send_message(message_t *msg, h2o_multithread_receiver_t *h2o_receiver)
- {
- h2o_multithread_send_message(h2o_receiver, &msg->super);
- }
|