Browse Source

libreactor: Enable SO_ATTACH_REUSEPORT_CBPF (#6244)

* libreactor: enable SO_ATTACH_REUSEPORT_CBPF

SO_ATTACH_REUSEPORT_CBPF is a BPF based "program" that automatically assigns a packet to a given socket based on the core id of the CPU that initially received the packet and did the IRQ processing. This improves data locality and therefore increases performance.

* libreactor: Improve SO_ATTACH_REUSEPORT_CBPF performance by controlling worker forking order

Rename setup() to fork_workers() to make its purpose clearer

The standard BPF program used with SO_ATTACH_REUSEPORT_CBPF automatically assigns a packet to a given socket based on the core id of the CPU that initially received the packet and did the IRQ processing, CPU 0 -> socket 0. The idea is that if the packet is passed to the userland code running on the same CPU then things are more efficient. However, contrary to my initial assumption, there isn't an automatic mapping between the id of a socket, and the id of the CPU that the userland process (which opened the socket) is running on. The "id" of the socket is determined by the order in which sockets are opened. So it works best if the order in which the sockets are opened is controlled to match the order in which processes are pinned to CPUs.

Previously, the for loop in setup() (a) forked a child process, (b) pinned it to a CPU, and then (c) started up an instance of the libreactor server. However since fork() was being called inside the loop, the order in which the sockets got opened in the child processes was not deterministic. In some cases the process that was pinned to CPU 0 would actually end up being the third process to open a socket, so it would end up getting packets that had been received on the kernel side by CPU 2, which of course doesn't bring any efficiency gains.

To resolve this, I am using an eventfd semaphore to communicate between the parent and child processes and ensure that the forking happens sequentially, and the order of the sockets being opened matches the order of the CPUs being pinned. Now I am seeing a much more consistent performance improvement.

* libreactor: Upgrade to newly released libdynamic 2.2.0
Marc Richards 4 years ago
parent
commit
987c956587

+ 3 - 4
frameworks/C/libreactor/libreactor-server.dockerfile

@@ -7,10 +7,9 @@ WORKDIR /build
 
 ENV CC=gcc-10 AR=gcc-ar-10 NM=gcc-nm-10 RANLIB=gcc-ranlib-10
 
-RUN git clone https://github.com/fredrikwidlund/libdynamic && \
-    cd libdynamic && \
-    git checkout aee8f053c113 && \
-    ./autogen.sh && \
+RUN wget -q https://github.com/fredrikwidlund/libdynamic/releases/download/v2.2.0/libdynamic-2.2.0.tar.gz && \
+    tar xfz libdynamic-2.2.0.tar.gz && \
+    cd libdynamic-2.2.0 && \
     ./configure && \
     make install
 

+ 3 - 4
frameworks/C/libreactor/libreactor.dockerfile

@@ -7,10 +7,9 @@ WORKDIR /build
 
 ENV CC=gcc-10 AR=gcc-ar-10 NM=gcc-nm-10 RANLIB=gcc-ranlib-10
 
-RUN git clone https://github.com/fredrikwidlund/libdynamic && \
-    cd libdynamic && \
-    git checkout aee8f053c113 && \
-    ./autogen.sh && \
+RUN wget -q https://github.com/fredrikwidlund/libdynamic/releases/download/v2.2.0/libdynamic-2.2.0.tar.gz && \
+    tar xfz libdynamic-2.2.0.tar.gz && \
+    cd libdynamic-2.2.0 && \
     ./configure && \
     make install
 

+ 78 - 21
frameworks/C/libreactor/src/helpers.c

@@ -7,6 +7,9 @@
 #include <string.h>
 #include <sched.h>
 #include <sys/wait.h>
+#include <sys/eventfd.h>
+#include <netinet/in.h>
+#include <linux/filter.h>
 #include <err.h>
 
 #include <dynamic.h>
@@ -80,35 +83,89 @@ void json(server_context *context, clo *json_object)
   write_response(&context->session->stream, json_preamble, segment_string(json_string));
 }
 
-void setup()
+void enable_reuseport_cbpf(server *s)
 {
+  struct sock_filter code[] = {{BPF_LD | BPF_W | BPF_ABS, 0, 0, SKF_AD_OFF + SKF_AD_CPU}, {BPF_RET | BPF_A, 0, 0, 0}};
+  struct sock_fprog prog = { .len = sizeof(code)/sizeof(code[0]), .filter = code };
   int e;
+
+  e = setsockopt(s->fd, SOL_SOCKET, SO_ATTACH_REUSEPORT_CBPF, &prog, sizeof(prog));
+  if (e == -1)
+    err(1, "SO_ATTACH_REUSEPORT_CBPF");
+}
+
+int fork_workers()
+{
+  int e, efd, worker_count = 0;
   pid_t pid;
-  cpu_set_t available_cpus, cpu;
+  eventfd_t eventfd_value;
+  cpu_set_t online_cpus, cpu;
 
   signal(SIGPIPE, SIG_IGN);
-  CPU_ZERO(&available_cpus);
-  sched_getaffinity(0, sizeof(available_cpus), &available_cpus); // Get set of all available CPUs
 
-  for (int i = 0; i < CPU_SETSIZE; i++)
+  // Get set/count of all online CPUs
+  CPU_ZERO(&online_cpus);
+  sched_getaffinity(0, sizeof(online_cpus), &online_cpus);
+  int num_online_cpus = CPU_COUNT(&online_cpus);
+
+  // Create a mapping between the relative cpu id and absolute cpu id for cases where the cpu ids are not contiguous
+  // E.g if only cpus 0, 1, 8, and 9 are visible to the app because taskset was used or because some cpus are offline
+  // then the mapping is 0 -> 0, 1 -> 1, 2 -> 8, 3 -> 9
+  int rel_to_abs_cpu[num_online_cpus];
+  int rel_cpu_index = 0;
+
+  for (int abs_cpu_index = 0; abs_cpu_index < CPU_SETSIZE; abs_cpu_index++) {
+    if (CPU_ISSET(abs_cpu_index, &online_cpus)){
+      rel_to_abs_cpu[rel_cpu_index] = abs_cpu_index;
+      rel_cpu_index++;
+
+      if (rel_cpu_index == num_online_cpus)
+        break;
+    }
+  }
+
+  // fork a new child/worker process for each available cpu
+  for (int i = 0; i < num_online_cpus; i++)
   {
-    if (CPU_ISSET(i, &available_cpus))
+    // Create an eventfd to communicate with the forked child process on each iteration
+    // This ensures that the order of forking is deterministic which is important when using SO_ATTACH_REUSEPORT_CBPF
+    efd = eventfd(0, EFD_SEMAPHORE);
+    if (efd == -1)
+      err(1, "eventfd");
+
+    pid = fork();
+    if (pid == -1)
+      err(1, "fork");
+
+    // Parent process. Block the for loop until the child has set cpu affinity AND started listening on its socket
+    if (pid > 0)
+    {
+      // Block waiting for the child process to update the eventfd semaphore as a signal to proceed
+      eventfd_read(efd, &eventfd_value);
+      close(efd);
+
+      worker_count++;
+      (void) fprintf(stderr, "Worker running on CPU %d\n", i);
+      continue;
+    }
+
+    // Child process. Set cpu affinity and return eventfd
+    if (pid == 0)
     {
-      pid = fork();
-      if (pid == -1)
-        err(1, "fork");
-
-      if (pid == 0)
-      {
-       CPU_ZERO(&cpu);
-       CPU_SET(i, &cpu);
-        e = sched_setaffinity(0, sizeof cpu, &cpu);
-        if (e == -1)
-          err(1, "sched_setaffinity");
-
-        return;
-      }
+      CPU_ZERO(&cpu);
+      CPU_SET(rel_to_abs_cpu[i], &cpu);
+      e = sched_setaffinity(0, sizeof cpu, &cpu);
+      if (e == -1)
+        err(1, "sched_setaffinity");
+
+      // Break out of the for loop and continue running main. The child will signal the parent once the socket is open
+      return efd;
     }
   }
-  wait(NULL);
+
+  (void) fprintf(stderr, "libreactor running with %d worker processes\n", worker_count);
+
+  wait(NULL); // wait for children to exit
+  (void) fprintf(stderr, "A worker process has exited unexpectedly. Shutting down.\n");
+  exit(EXIT_FAILURE);
 }

+ 3 - 1
frameworks/C/libreactor/src/helpers.h

@@ -5,6 +5,8 @@ void plaintext(server_context *context, char *response);
 
 void json(server_context *context, clo *json_object);
 
-void setup();
+void enable_reuseport_cbpf(server *s);
+
+int fork_workers();
 
 #endif /* HELPERS_H_INCLUDED */

+ 10 - 1
frameworks/C/libreactor/src/libreactor-server.c

@@ -3,6 +3,7 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <err.h>
+#include <sys/eventfd.h>
 
 #include <dynamic.h>
 #include <reactor.h>
@@ -44,13 +45,21 @@ static core_status server_handler(core_event *event)
 
 int main()
 {
+  int parent_eventfd;
   server s;
 
-  setup();
+  // fork_workers() forks a separate child/worker process for each available cpu and returns an eventfd from the parent
+  // The eventfd is used to signal the parent. This guarantees the forking order needed for REUSEPORT_CBPF to work well
+  parent_eventfd = fork_workers();
+
   core_construct(NULL);
   server_construct(&s, server_handler, &s);
   server_open(&s, 0, 8080);
 
+  // Signal the parent process so that it can proceed with the next fork
+  eventfd_write(parent_eventfd, (eventfd_t) 1);
+  close(parent_eventfd);
+
   core_loop(NULL);
   core_destruct(NULL);
 }

+ 11 - 1
frameworks/C/libreactor/src/libreactor.c

@@ -3,6 +3,7 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <err.h>
+#include <sys/eventfd.h>
 
 #include <dynamic.h>
 #include <reactor.h>
@@ -42,12 +43,21 @@ static core_status server_handler(core_event *event)
 
 int main()
 {
+  int parent_eventfd;
   server s;
 
-  setup();
+  // fork_workers() forks a separate child/worker process for each available cpu and returns an eventfd from the parent
+  // The eventfd is used to signal the parent. This guarantees the forking order needed for REUSEPORT_CBPF to work well
+  parent_eventfd = fork_workers();
+
   core_construct(NULL);
   server_construct(&s, server_handler, &s);
   server_open(&s, 0, 8080);
+  enable_reuseport_cbpf(&s);
+
+  // Signal the parent process so that it can proceed with the next fork
+  eventfd_write(parent_eventfd, (eventfd_t) 1);
+  close(parent_eventfd);
 
   core_loop(NULL);
   core_destruct(NULL);