Kaynağa Gözat

ConnectionManager::wait_for_readers()

David Rose 14 yıl önce
ebeveyn
işleme
78abcf23e7

+ 25 - 6
panda/src/net/config_net.cxx

@@ -77,12 +77,12 @@ get_net_error_abort() {
 }
 }
 
 
 double
 double
-get_max_poll_cycle() {
-  static ConfigVariableDouble *max_poll_cycle = NULL;
+get_net_max_poll_cycle() {
+  static ConfigVariableDouble *net_max_poll_cycle = NULL;
 
 
-  if (max_poll_cycle == (ConfigVariableDouble *)NULL) {
-    max_poll_cycle = new ConfigVariableDouble
-      ("max-poll-cycle", 0.2,
+  if (net_max_poll_cycle == (ConfigVariableDouble *)NULL) {
+    net_max_poll_cycle = new ConfigVariableDouble
+      ("net-max-poll-cycle", 0.2,
        PRC_DESC("Specifies the maximum amount of time, in seconds, to "
        PRC_DESC("Specifies the maximum amount of time, in seconds, to "
                 "continue to read data within one cycle of the poll() "
                 "continue to read data within one cycle of the poll() "
                 "call.  If this is negative, the program will wait as "
                 "call.  If this is negative, the program will wait as "
@@ -92,7 +92,26 @@ get_max_poll_cycle() {
                 "in faster than it can be processed."));
                 "in faster than it can be processed."));
   }
   }
 
 
-  return *max_poll_cycle;
+  return *net_max_poll_cycle;
+}
+
+double
+get_net_max_block() {
+  static ConfigVariableDouble *net_max_block = NULL;
+
+  if (net_max_block == (ConfigVariableDouble *)NULL) {
+    net_max_block = new ConfigVariableDouble
+      ("net-max-block", 0.01,
+       PRC_DESC("Specifies the maximum amount of time, in seconds, to "
+                "completely block the process during any blocking wait "
+                "in the net subsystem.  This is an internal timeout only, "
+                "and gives the net subsystem a chance to detect things "
+                "like explicitly-closed connections in another thread; it "
+                "does not affect the blocking behavior at the high "
+                "level.")); 
+  }
+
+  return *net_max_block;
 }
 }
 
 
 // This function is used in the ReaderThread and WriterThread
 // This function is used in the ReaderThread and WriterThread

+ 4 - 1
panda/src/net/config_net.h

@@ -18,6 +18,7 @@
 #include "pandabase.h"
 #include "pandabase.h"
 #include "notifyCategoryProxy.h"
 #include "notifyCategoryProxy.h"
 #include "configVariableInt.h"
 #include "configVariableInt.h"
+#include "configVariableDouble.h"
 #include "configVariableBool.h"
 #include "configVariableBool.h"
 #include "configVariableEnum.h"
 #include "configVariableEnum.h"
 #include "threadPriority.h"
 #include "threadPriority.h"
@@ -29,11 +30,13 @@ NotifyCategoryDecl(net, EXPCL_PANDA_NET, EXPTP_PANDA_NET);
 extern int get_net_max_write_queue();
 extern int get_net_max_write_queue();
 extern int get_net_max_response_queue();
 extern int get_net_max_response_queue();
 extern bool get_net_error_abort();
 extern bool get_net_error_abort();
-extern double get_max_poll_cycle();
+extern double get_net_max_poll_cycle();
+extern double get_net_max_block();
 extern string make_thread_name(const string &thread_name, int thread_index);
 extern string make_thread_name(const string &thread_name, int thread_index);
 
 
 extern ConfigVariableInt net_max_read_per_epoch;
 extern ConfigVariableInt net_max_read_per_epoch;
 extern ConfigVariableInt net_max_write_per_epoch;
 extern ConfigVariableInt net_max_write_per_epoch;
+
 extern ConfigVariableEnum<ThreadPriority> net_thread_priority;
 extern ConfigVariableEnum<ThreadPriority> net_thread_priority;
 
 
 extern EXPCL_PANDA_NET void init_libnet();
 extern EXPCL_PANDA_NET void init_libnet();

+ 105 - 10
panda/src/net/connectionManager.cxx

@@ -27,7 +27,7 @@
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 //     Function: ConnectionManager::Constructor
 //     Function: ConnectionManager::Constructor
-//       Access: Public
+//       Access: Published
 //  Description:
 //  Description:
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 ConnectionManager::
 ConnectionManager::
@@ -37,7 +37,7 @@ ConnectionManager() : _set_mutex("ConnectionManager::_set_mutex")
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 //     Function: ConnectionManager::Destructor
 //     Function: ConnectionManager::Destructor
-//       Access: Public, Virtual
+//       Access: Published, Virtual
 //  Description:
 //  Description:
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 ConnectionManager::
 ConnectionManager::
@@ -56,7 +56,7 @@ ConnectionManager::
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 //     Function: ConnectionManager::open_UDP_connection
 //     Function: ConnectionManager::open_UDP_connection
-//       Access: Public
+//       Access: Published
 //  Description: Opens a socket for sending and/or receiving UDP
 //  Description: Opens a socket for sending and/or receiving UDP
 //               packets.  If the port number is greater than zero,
 //               packets.  If the port number is greater than zero,
 //               the UDP connection will be opened for listening on
 //               the UDP connection will be opened for listening on
@@ -105,7 +105,7 @@ open_UDP_connection(int port) {
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 //     Function: ConnectionManager::open_TCP_server_rendezvous
 //     Function: ConnectionManager::open_TCP_server_rendezvous
-//       Access: Public
+//       Access: Published
 //  Description: Creates a socket to be used as a rendezvous socket
 //  Description: Creates a socket to be used as a rendezvous socket
 //               for a server to listen for TCP connections.  The
 //               for a server to listen for TCP connections.  The
 //               socket returned by this call should only be added to
 //               socket returned by this call should only be added to
@@ -128,7 +128,7 @@ open_TCP_server_rendezvous(int port, int backlog) {
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 //     Function: ConnectionManager::open_TCP_server_rendezvous
 //     Function: ConnectionManager::open_TCP_server_rendezvous
-//       Access: Public
+//       Access: Published
 //  Description: Creates a socket to be used as a rendezvous socket
 //  Description: Creates a socket to be used as a rendezvous socket
 //               for a server to listen for TCP connections.  The
 //               for a server to listen for TCP connections.  The
 //               socket returned by this call should only be added to
 //               socket returned by this call should only be added to
@@ -158,7 +158,7 @@ open_TCP_server_rendezvous(const string &hostname, int port, int backlog) {
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 //     Function: ConnectionManager::open_TCP_server_rendezvous
 //     Function: ConnectionManager::open_TCP_server_rendezvous
-//       Access: Public
+//       Access: Published
 //  Description: Creates a socket to be used as a rendezvous socket
 //  Description: Creates a socket to be used as a rendezvous socket
 //               for a server to listen for TCP connections.  The
 //               for a server to listen for TCP connections.  The
 //               socket returned by this call should only be added to
 //               socket returned by this call should only be added to
@@ -200,7 +200,7 @@ open_TCP_server_rendezvous(const NetAddress &address, int backlog) {
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 //     Function: ConnectionManager::open_TCP_client_connection
 //     Function: ConnectionManager::open_TCP_client_connection
-//       Access: Public
+//       Access: Published
 //  Description: Attempts to establish a TCP client connection to a
 //  Description: Attempts to establish a TCP client connection to a
 //               server at the indicated address.  If the connection
 //               server at the indicated address.  If the connection
 //               is not established within timeout_ms milliseconds, a
 //               is not established within timeout_ms milliseconds, a
@@ -269,7 +269,7 @@ open_TCP_client_connection(const NetAddress &address, int timeout_ms) {
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 //     Function: ConnectionManager::open_TCP_client_connection
 //     Function: ConnectionManager::open_TCP_client_connection
-//       Access: Public
+//       Access: Published
 //  Description: This is a shorthand version of the function to
 //  Description: This is a shorthand version of the function to
 //               directly establish communications to a named host and
 //               directly establish communications to a named host and
 //               port.
 //               port.
@@ -287,7 +287,7 @@ open_TCP_client_connection(const string &hostname, int port,
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 //     Function: ConnectionManager::close_connection
 //     Function: ConnectionManager::close_connection
-//       Access: Public
+//       Access: Published
 //  Description: Terminates a UDP or TCP socket previously opened.
 //  Description: Terminates a UDP or TCP socket previously opened.
 //               This also removes it from any associated
 //               This also removes it from any associated
 //               ConnectionReader or ConnectionListeners.
 //               ConnectionReader or ConnectionListeners.
@@ -340,9 +340,104 @@ close_connection(const PT(Connection) &connection) {
   return true;
   return true;
 }
 }
 
 
+
+////////////////////////////////////////////////////////////////////
+//     Function: ConnectionManager::wait_for_readers
+//       Access: Published
+//  Description: Blocks the process for timeout number of seconds, or
+//               until any data is available on any of the
+//               non-threaded ConnectionReaders or
+//               ConnectionListeners, whichever comes first.  The
+//               return value is true if there is data available (but
+//               you have to iterate through all readers to find it),
+//               or false if the timeout occurred without any data.
+//
+//               If the timeout value is negative, this will block
+//               forever or until data is available.
+//
+//               This only works if all ConnectionReaders and
+//               ConnectionListeners are non-threaded.  If any
+//               threaded ConnectionReaders are part of the
+//               ConnectionManager, the timeout value is implicitly
+//               treated as 0.
+////////////////////////////////////////////////////////////////////
+bool ConnectionManager::
+wait_for_readers(double timeout) {
+  bool block_forever = false;
+  if (timeout < 0.0) {
+    block_forever = true;
+    timeout = 0.0;
+  }
+
+  TrueClock *clock = TrueClock::get_global_ptr();
+  double now = clock->get_short_time();
+  double stop = now + timeout;
+  do {
+    Socket_fdset fdset;
+    fdset.clear();
+    bool any_threaded = false;
+    
+    {
+      LightMutexHolder holder(_set_mutex);
+      
+      Readers::iterator ri;
+      for (ri = _readers.begin(); ri != _readers.end(); ++ri) {
+        ConnectionReader *reader = (*ri);
+        if (reader->is_polling()) {
+          // If it's a polling reader, we can wait for its socket.
+          // (If it's a threaded reader, we can't do anything here.)
+          reader->accumulate_fdset(fdset);
+        } else {
+          any_threaded = true;
+          stop = now;
+          block_forever = false;
+        }
+      }
+    }
+
+    double wait_timeout = get_net_max_block();
+    if (!block_forever) { 
+      wait_timeout = min(wait_timeout, stop - now);
+    }
+
+    PN_uint32 wait_timeout_ms = (PN_uint32)(wait_timeout * 1000.0);
+    if (any_threaded) {
+      // If there are any threaded ConnectionReaders, we can't block
+      // at all.
+      wait_timeout_ms = 0;
+    }
+#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
+    // In the presence of SIMPLE_THREADS, we never wait at all,
+    // but rather we yield the thread if we come up empty (so that
+    // we won't block the entire process).
+    wait_timeout_ms = 0;
+#endif
+    int num_results = fdset.WaitForRead(false, wait_timeout_ms);
+    if (num_results != 0) {
+      // If we got an answer (or an error), return success.  The
+      // caller can then figure out what happened.
+      if (num_results < 0) {
+        // Go ahead and yield the timeslice if we got an error.
+        Thread::force_yield();
+      }
+      return true;
+    }
+
+    // No answer yet, so yield and wait some more.  We don't actually
+    // block forever, even in the threaded case, so we can detect
+    // ConnectionReaders being added and removed and such.
+    Thread::force_yield();
+
+    now = clock->get_short_time();
+  } while (now < stop || block_forever);
+
+  // Timeout occurred; no data.
+  return false;
+}
+
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 //     Function: ConnectionManager::get_host_name
 //     Function: ConnectionManager::get_host_name
-//       Access: Public, Static
+//       Access: Published, Static
 //  Description: Returns the name of this particular machine on the
 //  Description: Returns the name of this particular machine on the
 //               network, if available, or the empty string if the
 //               network, if available, or the empty string if the
 //               hostname cannot be determined.
 //               hostname cannot be determined.

+ 1 - 0
panda/src/net/connectionManager.h

@@ -61,6 +61,7 @@ PUBLISHED:
                                                      int timeout_ms);
                                                      int timeout_ms);
 
 
   bool close_connection(const PT(Connection) &connection);
   bool close_connection(const PT(Connection) &connection);
+  BLOCKING bool wait_for_readers(double timeout);
 
 
   static string get_host_name();
   static string get_host_name();
 
 

+ 26 - 6
panda/src/net/connectionReader.cxx

@@ -29,8 +29,6 @@
 
 
 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
 
 
-static const int max_timeout_ms = 100;
-
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 //     Function: ConnectionReader::SocketInfo::Constructor
 //     Function: ConnectionReader::SocketInfo::Constructor
 //       Access: Public
 //       Access: Public
@@ -296,7 +294,7 @@ poll() {
 
 
   SocketInfo *sinfo = get_next_available_socket(false, -2);
   SocketInfo *sinfo = get_next_available_socket(false, -2);
   if (sinfo != (SocketInfo *)NULL) {
   if (sinfo != (SocketInfo *)NULL) {
-    double max_poll_cycle = get_max_poll_cycle();
+    double max_poll_cycle = get_net_max_poll_cycle();
     if (max_poll_cycle < 0.0) {
     if (max_poll_cycle < 0.0) {
       // Continue to read all data.
       // Continue to read all data.
       while (sinfo != (SocketInfo *)NULL) {
       while (sinfo != (SocketInfo *)NULL) {
@@ -921,7 +919,7 @@ get_next_available_socket(bool allow_block, int current_thread_index) {
       _next_index = 0;
       _next_index = 0;
 
 
       if (!_shutdown) {
       if (!_shutdown) {
-        PN_uint32 timeout = max_timeout_ms;
+        PN_uint32 timeout = (PN_uint32)(get_net_max_block() * 1000.0);
         if (!allow_block) {
         if (!allow_block) {
           timeout = 0;
           timeout = 0;
         }
         }
@@ -936,14 +934,16 @@ get_next_available_socket(bool allow_block, int current_thread_index) {
       }
       }
 
 
       if (_num_results == 0 && allow_block) {
       if (_num_results == 0 && allow_block) {
-        // If we reached max_timeout_ms, go back and reconsider.  (We
+        // If we reached net_max_block, go back and reconsider.  (We
         // never timeout indefinitely, so we can check the shutdown
         // never timeout indefinitely, so we can check the shutdown
         // flag every once in a while.)
         // flag every once in a while.)
         interrupted = true;
         interrupted = true;
         Thread::force_yield();
         Thread::force_yield();
 
 
       } else if (_num_results < 0) {
       } else if (_num_results < 0) {
-        // If we had an error, just return.
+        // If we had an error, just return.  But yield the timeslice
+        // first.
+        Thread::force_yield();
         return (SocketInfo *)NULL;
         return (SocketInfo *)NULL;
       }
       }
     } while (!_shutdown && interrupted);
     } while (!_shutdown && interrupted);
@@ -995,3 +995,23 @@ rebuild_select_list() {
     _removed_sockets.swap(still_busy_sockets);
     _removed_sockets.swap(still_busy_sockets);
   }
   }
 }
 }
+
+////////////////////////////////////////////////////////////////////
+//     Function: ConnectionReader::accumulate_fdset
+//       Access: Private
+//  Description: Adds the sockets from this ConnectionReader (or
+//               ConnectionListener) to the indicated fdset.  This is
+//               used by ConnectionManager::block() to build an fdset
+//               of all attached readers.
+////////////////////////////////////////////////////////////////////
+void ConnectionReader::
+accumulate_fdset(Socket_fdset &fdset) {
+  LightMutexHolder holder(_sockets_mutex);
+  Sockets::const_iterator si;
+  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
+    SocketInfo *sinfo = (*si);
+    if (!sinfo->_busy && !sinfo->_error) {
+      fdset.setForSocket(*sinfo->get_socket());
+    }
+  }
+}

+ 1 - 0
panda/src/net/connectionReader.h

@@ -133,6 +133,7 @@ private:
                                         int current_thread_index);
                                         int current_thread_index);
 
 
   void rebuild_select_list();
   void rebuild_select_list();
+  void accumulate_fdset(Socket_fdset &fdset);
 
 
 private:
 private:
   bool _raw_mode;
   bool _raw_mode;