소스 검색

Adding Matt Holiday's fixes to suspend/resume logic
(plus documentation updates).

Christian Grothoff 12 년 전
부모
커밋
04ca76e78e
6개의 변경된 파일264개의 추가작업 그리고 70개의 파일을 삭제
  1. 29 0
      ChangeLog
  2. 7 1
      doc/libmicrohttpd.texi
  3. 7 1
      src/include/microhttpd.h
  4. 11 4
      src/microhttpd/connection.c
  5. 186 64
      src/microhttpd/daemon.c
  6. 24 0
      src/microhttpd/internal.h

+ 29 - 0
ChangeLog

@@ -1,3 +1,32 @@
+Tue Dec  3 21:05:38 CET 2013
+	Signaling n times for shutdown works, but for resume we need to
+	wake up the correct daemon. Even if we signal n times in that
+	case also, there's no guarantee that some daemon can't run
+	through its select loop more than once before the daemon we want
+	to wake up gets a chance to read.  Thus we need a signal pipe
+	per thread in the thread pool IF MHD_suspend_connection is used.
+	This introduces a new flag MHD_USE_SUSPEND_RESUME to add those
+	additional pipes and only allow MHD_suspend_connection to be
+	used in conjunction with this flag.
+
+	Also, as MHD_resume_connection() will be called on a non-daemon
+	thread, but none of the queue insert/delete calls are thread safe,
+	we need to be concerned about (a) corrupting the queue, and (b)
+	having to add mutex protection around every access to the queues,
+	including loops through timer queues, etc. This wasn't a problem
+	before adding resume; even suspend should be safe since it happens
+	in a callback from the daemon.
+
+	I think it's easier to (a) have MHD_suspend_connection() move the
+	connection to a suspended queue, (b) have MHD_resume_connection()
+	mark the connection as resuming, and then (c) do all the actual
+	queue manipulations in MHD_select (poll, epoll, etc.) to move the
+	resumed connections back to their normal queues, in response to
+	the wake up. The changes are simpler & cleaner. There is a cost to
+	the basic select loop that is avoided by making suspend/resume a
+	startup option. The per-worker pipes can then also be enabled only
+	with that option set. -MH
+
 Fri Nov 29 20:17:03 CET 2013
 	Eliminating theoretical stack overflow by limiting length
 	of URIs in authentication headers to 32k (only applicable

+ 7 - 1
doc/libmicrohttpd.texi

@@ -520,6 +520,12 @@ use of this option is automatic (as in, you do not even have to
 specify it), if @code{MHD_USE_NO_LISTEN_SOCKET} is specified.  In
 "external" select mode, this option is always simply ignored.
 
+@item MHD_USE_SUSPEND_RESUME
+Enables using @code{MHD_suspend_connection} and
+@code{MHD_resume_connection}, as performing these calls requires some
+additional pipes to be created, and code not using these calls should
+not pay the cost.
+
 @end table
 @end deftp
 
@@ -1856,7 +1862,7 @@ select, internal select or thread pool; not applicable to
 thread-per-connection!) for a while.
 
 If you use this API in conjunction with a internal select or a
-thread pool, you must set the option @code{MHD_USE_PIPE_FOR_SHUTDOWN} to
+thread pool, you must set the option @code{MHD_USE_SUSPEND_RESUME} to
 ensure that a resumed connection is immediately processed by MHD.
 
 Suspended connections continue to count against the total number of

+ 7 - 1
src/include/microhttpd.h

@@ -518,7 +518,13 @@ enum MHD_FLAG
    * Enalbed always on W32 as winsock does not properly behave
    * with `shutdown()` and this then fixes potential problems.
    */
-  MHD_USE_EPOLL_TURBO = 4096
+  MHD_USE_EPOLL_TURBO = 4096,
+
+  /**
+   * Enable suspend/resume functions, which also implies setting up
+   * pipes to signal resume.
+   */
+  MHD_USE_SUSPEND_RESUME = 8192 | MHD_USE_PIPE_FOR_SHUTDOWN
 
 };
 

+ 11 - 4
src/microhttpd/connection.c

@@ -2079,16 +2079,23 @@ cleanup_connection (struct MHD_Connection *connection)
     XDLL_remove (daemon->manual_timeout_head,
 		 daemon->manual_timeout_tail,
 		 connection);
-  DLL_remove (daemon->connections_head,
-	      daemon->connections_tail,
-	      connection);
+  if (MHD_YES == connection->suspended)
+    DLL_remove (daemon->suspended_connections_head,
+                daemon->suspended_connections_tail,
+                connection);
+  else
+    DLL_remove (daemon->connections_head,
+                daemon->connections_tail,
+                connection);
   DLL_insert (daemon->cleanup_head,
 	      daemon->cleanup_tail,
 	      connection);
+  connection->suspended = MHD_NO;
+  connection->resuming = MHD_NO;
+  connection->in_idle = MHD_NO;
   if ( (0 != (daemon->options & MHD_USE_THREAD_PER_CONNECTION)) &&
        (0 != pthread_mutex_unlock(&daemon->cleanup_connection_mutex)) )
     MHD_PANIC ("Failed to release cleanup mutex\n");
-  connection->in_idle = MHD_NO;
 }
 
 

+ 186 - 64
src/microhttpd/daemon.c

@@ -1026,7 +1026,6 @@ internal_add_connection (struct MHD_Daemon *daemon,
 #if OSX
   static int on = 1;
 #endif
-
   if (NULL != daemon->worker_pool)
     {
       /* have a pool, try to find a pool with capacity; we use the
@@ -1069,7 +1068,7 @@ internal_add_connection (struct MHD_Daemon *daemon,
 
 #if HAVE_MESSAGES
 #if DEBUG_CONNECT
-  MHD_DLOG (daemon, "Accepted connection on socket %d\n", s);
+  MHD_DLOG (daemon, "Accepted connection on socket %d\n", client_socket);
 #endif
 #endif
   if ( (0 == daemon->max_connections) ||
@@ -1396,14 +1395,17 @@ MHD_suspend_connection (struct MHD_Connection *connection)
   struct MHD_Daemon *daemon;
 
   daemon = connection->daemon;
-  if (0 != (daemon->options & MHD_USE_THREAD_PER_CONNECTION))
-    MHD_PANIC ("Cannot suspend connections in THREAD_PER_CONNECTION mode!\n");
+  if (0 == (daemon->options & MHD_USE_SUSPEND_RESUME))
+    MHD_PANIC ("Cannot suspend connections without enabling MHD_USE_SUSPEND_RESUME!\n");
   if ( (0 != (daemon->options & MHD_USE_THREAD_PER_CONNECTION)) &&
        (0 != pthread_mutex_lock (&daemon->cleanup_connection_mutex)) )
     MHD_PANIC ("Failed to acquire cleanup mutex\n");
   DLL_remove (daemon->connections_head,
               daemon->connections_tail,
               connection);
+  DLL_insert (daemon->suspended_connections_head,
+              daemon->suspended_connections_tail,
+              connection);
   if (connection->connection_timeout == daemon->connection_timeout)
     XDLL_remove (daemon->normal_timeout_head,
                  daemon->normal_timeout_tail,
@@ -1414,25 +1416,26 @@ MHD_suspend_connection (struct MHD_Connection *connection)
                  connection);
 #if EPOLL_SUPPORT
   if (0 != (daemon->options & MHD_USE_EPOLL_LINUX_ONLY))
-  {
-    if (0 != (connection->epoll_state & MHD_EPOLL_STATE_IN_EREADY_EDLL))
-      {
-        EDLL_remove (daemon->eready_head,
-                     daemon->eready_tail,
-                     connection);
-      }
-    if (0 != (connection->epoll_state & MHD_EPOLL_STATE_IN_EPOLL_SET))
-      {
-        if (0 != epoll_ctl (daemon->epoll_fd,
-                            EPOLL_CTL_DEL,
-                            connection->socket_fd,
-                            NULL))
-          MHD_PANIC ("Failed to remove FD from epoll set\n");
-        connection->epoll_state &= ~MHD_EPOLL_STATE_IN_EPOLL_SET;
-      }
-    connection->epoll_state |= MHD_EPOLL_STATE_SUSPENDED;
-  }
+    {
+      if (0 != (connection->epoll_state & MHD_EPOLL_STATE_IN_EREADY_EDLL))
+        {
+          EDLL_remove (daemon->eready_head,
+                       daemon->eready_tail,
+                       connection);
+        }
+      if (0 != (connection->epoll_state & MHD_EPOLL_STATE_IN_EPOLL_SET))
+        {
+          if (0 != epoll_ctl (daemon->epoll_fd,
+                              EPOLL_CTL_DEL,
+                              connection->socket_fd,
+                              NULL))
+            MHD_PANIC ("Failed to remove FD from epoll set\n");
+          connection->epoll_state &= ~MHD_EPOLL_STATE_IN_EPOLL_SET;
+        }
+      connection->epoll_state |= MHD_EPOLL_STATE_SUSPENDED;
+    }
 #endif
+  connection->suspended = MHD_YES;
   if ( (0 != (daemon->options & MHD_USE_THREAD_PER_CONNECTION)) &&
        (0 != pthread_mutex_unlock (&daemon->cleanup_connection_mutex)) )
     MHD_PANIC ("Failed to release cleanup mutex\n");
@@ -1453,50 +1456,15 @@ MHD_resume_connection (struct MHD_Connection *connection)
   struct MHD_Daemon *daemon;
 
   daemon = connection->daemon;
-  if (0 != (daemon->options & MHD_USE_THREAD_PER_CONNECTION))
-    MHD_PANIC ("Cannot resume connections in THREAD_PER_CONNECTION mode!\n");
+  if (0 == (daemon->options & MHD_USE_SUSPEND_RESUME))
+    MHD_PANIC ("Cannot resume connections without enabling MHD_USE_SUSPEND_RESUME!\n");
   if ( (0 != (daemon->options & MHD_USE_THREAD_PER_CONNECTION)) &&
        (0 != pthread_mutex_lock (&daemon->cleanup_connection_mutex)) )
     MHD_PANIC ("Failed to acquire cleanup mutex\n");
-  DLL_insert (daemon->connections_head,
-              daemon->connections_tail,
-              connection);
-  if (connection->connection_timeout == daemon->connection_timeout)
-    XDLL_insert (daemon->normal_timeout_head,
-                 daemon->normal_timeout_tail,
-                 connection);
-  else
-    XDLL_insert (daemon->manual_timeout_head,
-                 daemon->manual_timeout_tail,
-                 connection);
-#if EPOLL_SUPPORT
-  if (0 != (daemon->options & MHD_USE_EPOLL_LINUX_ONLY))
-  {
-    if (0 != (connection->epoll_state & MHD_EPOLL_STATE_IN_EREADY_EDLL))
-      {
-        EDLL_insert (daemon->eready_head,
-                     daemon->eready_tail,
-                     connection);
-      }
-    else
-      {
-        struct epoll_event event;
-
-        event.events = EPOLLIN | EPOLLOUT | EPOLLET;
-        event.data.ptr = connection;
-        if (0 != epoll_ctl (daemon->epoll_fd,
-                            EPOLL_CTL_ADD,
-                            connection->socket_fd,
-                            &event))
-          MHD_PANIC ("Failed to add FD to epoll set\n");
-        else
-          connection->epoll_state |= MHD_EPOLL_STATE_IN_EPOLL_SET;
-    }
-    connection->epoll_state &= ~MHD_EPOLL_STATE_SUSPENDED;
-  }
-#endif
+  connection->resuming = MHD_YES;
+  daemon->resuming = MHD_YES;
   if ( (-1 != daemon->wpipe[1]) &&
-       (1 != WRITE (daemon->wpipe[1], "n", 1)) )
+       (1 != WRITE (daemon->wpipe[1], "r", 1)) )
     {
 #if HAVE_MESSAGES
       MHD_DLOG (daemon,
@@ -1508,6 +1476,80 @@ MHD_resume_connection (struct MHD_Connection *connection)
     MHD_PANIC ("Failed to release cleanup mutex\n");
 }
 
+/**
+ * Run through the suspended connections and move any that are no
+ * longer suspended back to the active state.
+ *
+ * @param daemon daemon context
+ */
+static void
+resume_suspended_connections (struct MHD_Daemon *daemon)
+{
+  struct MHD_Connection *pos;
+  struct MHD_Connection *next = NULL;
+
+  if ( (0 != (daemon->options & MHD_USE_THREAD_PER_CONNECTION)) &&
+       (0 != pthread_mutex_lock (&daemon->cleanup_connection_mutex)) )
+    MHD_PANIC ("Failed to acquire cleanup mutex\n");
+
+  if (MHD_YES == daemon->resuming)
+    next = daemon->suspended_connections_head;
+
+  while (NULL != (pos = next))
+    {
+      next = pos->next;
+      if (MHD_NO == pos->resuming)
+        continue;
+ 
+      DLL_remove (daemon->suspended_connections_head,
+                  daemon->suspended_connections_tail,
+                  pos);
+      DLL_insert (daemon->connections_head,
+                  daemon->connections_tail,
+                  pos);
+      if (pos->connection_timeout == daemon->connection_timeout)
+        XDLL_insert (daemon->normal_timeout_head,
+                     daemon->normal_timeout_tail,
+                     pos);
+      else
+        XDLL_insert (daemon->manual_timeout_head,
+                     daemon->manual_timeout_tail,
+                     pos);
+#if EPOLL_SUPPORT
+      if (0 != (daemon->options & MHD_USE_EPOLL_LINUX_ONLY))
+        {
+          if (0 != (pos->epoll_state & MHD_EPOLL_STATE_IN_EREADY_EDLL))
+            {
+              EDLL_insert (daemon->eready_head,
+                           daemon->eready_tail,
+                           pos);
+            }
+          else
+            {
+              struct epoll_event event;
+
+              event.events = EPOLLIN | EPOLLOUT | EPOLLET;
+              event.data.ptr = pos;
+              if (0 != epoll_ctl (daemon->epoll_fd,
+                                  EPOLL_CTL_ADD,
+                                  pos->socket_fd,
+                                  &event))
+                MHD_PANIC ("Failed to add FD to epoll set\n");
+              else
+                pos->epoll_state |= MHD_EPOLL_STATE_IN_EPOLL_SET;
+            }
+          pos->epoll_state &= ~MHD_EPOLL_STATE_SUSPENDED;
+        }
+#endif
+      pos->suspended = MHD_NO;
+      pos->resuming = MHD_NO;
+    }
+  daemon->resuming = MHD_NO;
+  if ( (0 != (daemon->options & MHD_USE_THREAD_PER_CONNECTION)) &&
+       (0 != pthread_mutex_unlock (&daemon->cleanup_connection_mutex)) )
+    MHD_PANIC ("Failed to release cleanup mutex\n");
+}
+
 
 /**
  * Change socket options to be non-blocking, non-inheritable.
@@ -1995,6 +2037,9 @@ MHD_select (struct MHD_Daemon *daemon,
   max = -1;
   if (0 == (daemon->options & MHD_USE_THREAD_PER_CONNECTION))
     {
+      if (0 != (daemon->options & MHD_USE_SUSPEND_RESUME))
+        resume_suspended_connections (daemon);
+
       /* single-threaded, go over everything */
       if (MHD_NO == MHD_get_fdset (daemon, &rs, &ws, &es, &max))
         return MHD_NO;
@@ -2072,6 +2117,9 @@ MHD_poll_all (struct MHD_Daemon *daemon,
   struct MHD_Connection *pos;
   struct MHD_Connection *next;
 
+  if (0 != (daemon->options & MHD_USE_SUSPEND_RESUME))
+    resume_suspended_connections (daemon);
+
   /* count number of connections and thus determine poll set size */
   num_connections = 0;
   for (pos = daemon->connections_head; NULL != pos; pos = pos->next)
@@ -2322,6 +2370,7 @@ MHD_epoll (struct MHD_Daemon *daemon,
   int num_events;
   unsigned int i;
   unsigned int series_length;
+  char tmp;
 
   if (-1 == daemon->epoll_fd)
     return MHD_NO; /* we're down! */
@@ -2402,6 +2451,12 @@ MHD_epoll (struct MHD_Daemon *daemon,
 	{
 	  if (NULL == events[i].data.ptr)
 	    continue; /* shutdown signal! */
+      if ( (-1 != daemon->wpipe[0]) &&
+           (daemon->wpipe[0] == events[i].data.fd) )
+        {
+          (void) read (daemon->wpipe[0], &tmp, sizeof (tmp));
+          continue;
+        }
 	  if (daemon != events[i].data.ptr)
 	    {
 	      /* this is an event relating to a 'normal' connection,
@@ -2447,6 +2502,11 @@ MHD_epoll (struct MHD_Daemon *daemon,
 	}
     }
 
+  /* we handle resumes here because we may have ready connections
+     that will not be placed into the epoll list immediately. */
+  if (0 != (daemon->options & MHD_USE_SUSPEND_RESUME))
+    resume_suspended_connections (daemon);
+
   /* process events for connections */
   while (NULL != (pos = daemon->eready_tail))
     {
@@ -3054,6 +3114,26 @@ setup_epoll_to_listen (struct MHD_Daemon *daemon)
 #endif
       return MHD_NO;
     }
+  if ( (-1 != daemon->wpipe[0]) &&
+       (0 != (daemon->options & MHD_USE_SUSPEND_RESUME)) )
+    {
+      event.events = EPOLLIN | EPOLLET;
+      event.data.ptr = NULL;
+      event.data.fd = daemon->wpipe[0];
+      if (0 != epoll_ctl (daemon->epoll_fd,
+                          EPOLL_CTL_ADD,
+                          daemon->wpipe[0],
+                          &event))
+        {
+#if HAVE_MESSAGES
+          if (0 != (daemon->options & MHD_USE_DEBUG))
+            MHD_DLOG (daemon,
+                      "Call to epoll_ctl failed: %s\n",
+                      STRERROR (errno));
+#endif
+          return MHD_NO;
+        }
+    }
   daemon->listen_socket_in_epoll = MHD_YES;
   return MHD_YES;
 }
@@ -3274,6 +3354,16 @@ MHD_start_daemon_va (unsigned int flags,
       goto free_and_fail;
     }
 
+  if ( (0 != (flags & MHD_USE_SUSPEND_RESUME)) &&
+       (0 != (flags & MHD_USE_THREAD_PER_CONNECTION)) )
+    {
+#if HAVE_MESSAGES
+      MHD_DLOG (daemon,
+                "Combining MHD_USE_THREAD_PER_CONNECTION and MHD_USE_SUSPEND_RESUME is not supported.");
+#endif
+      goto free_and_fail;
+    }
+
 #ifdef __SYMBIAN32__
   if (0 != (flags & (MHD_USE_SELECT_INTERNALLY | MHD_USE_THREAD_PER_CONNECTION)))
     {
@@ -3588,6 +3678,38 @@ MHD_start_daemon_va (unsigned int flags,
           d->worker_pool_size = 0;
           d->worker_pool = NULL;
 
+          if ( (0 != (flags & MHD_USE_SUSPEND_RESUME)) &&
+#ifdef WINDOWS
+               (0 != SOCKETPAIR (AF_INET, SOCK_STREAM, IPPROTO_TCP, d->wpipe))
+#else
+               (0 != PIPE (d->wpipe))
+#endif
+             )
+            {
+#if HAVE_MESSAGES
+              MHD_DLOG (daemon,
+                        "Failed to create worker control pipe: %s\n",
+                        STRERROR (errno));
+#endif
+              goto thread_failed;
+            }
+#ifndef WINDOWS
+          if ( (0 == (flags & MHD_USE_POLL)) &&
+               (0 != (flags & MHD_USE_SUSPEND_RESUME)) &&
+               (d->wpipe[0] >= FD_SETSIZE) )
+            {
+#if HAVE_MESSAGES
+              MHD_DLOG (daemon,
+                        "file descriptor for worker control pipe exceeds maximum value\n");
+#endif
+              if (0 != CLOSE (d->wpipe[0]))
+                MHD_PANIC ("close failed\n");
+              if (0 != CLOSE (d->wpipe[1]))
+                MHD_PANIC ("close failed\n");
+              goto thread_failed;
+            }
+#endif
+
           /* Divide available connections evenly amongst the threads.
            * Thread indexes in [0, leftover_conns) each get one of the
            * leftover connections. */
@@ -3842,9 +3964,9 @@ MHD_stop_daemon (struct MHD_Daemon *daemon)
       /* MHD_USE_NO_LISTEN_SOCKET disables thread pools, hence we need to check */
       for (i = 0; i < daemon->worker_pool_size; ++i)
 	{
-	  if (-1 != daemon->wpipe[1])
+	  if (-1 != daemon->worker_pool[i].wpipe[1])
 	    {
-	      if (1 != WRITE (daemon->wpipe[1], "e", 1))
+	      if (1 != WRITE (daemon->worker_pool[i].wpipe[1], "e", 1))
 		MHD_PANIC ("failed to signal shutdown via pipe");
 	    }
 	  if (0 != (rc = pthread_join (daemon->worker_pool[i].pid, &unused)))

+ 24 - 0
src/microhttpd/internal.h

@@ -834,6 +834,15 @@ struct MHD_Connection
    */
   int tls_read_ready;
 
+  /**
+   * Is the connection suspended?
+   */
+  int suspended;
+
+  /**
+   * Is the connection wanting to resume?
+   */
+  int resuming;
 #endif
 };
 
@@ -893,6 +902,16 @@ struct MHD_Daemon
    */
   struct MHD_Connection *connections_tail;
 
+  /**
+   * Head of doubly-linked list of our current but suspended connections.
+   */
+  struct MHD_Connection *suspended_connections_head;
+
+  /**
+   * Tail of doubly-linked list of our current but suspended connections.
+   */
+  struct MHD_Connection *suspended_connections_tail;
+
   /**
    * Head of doubly-linked list of connections to clean up.
    */
@@ -1088,6 +1107,11 @@ struct MHD_Daemon
    */
   int shutdown;
 
+  /*
+   * Do we need to process resuming connections?
+   */
+  int resuming;
+
   /**
    * Limit on the number of parallel connections.
    */