Bläddra i källkod

pseudo-threaded network reads and writes

David Rose 17 år sedan
förälder
incheckning
c5957be27b

+ 55 - 4
panda/src/net/connection.cxx

@@ -45,6 +45,12 @@ Connection(ConnectionManager *manager, Socket_IP *socket) :
   _collect_tcp_interval = collect_tcp_interval;
   _queued_data_start = 0.0;
   _queued_count = 0;
+
+#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
+  // In the presence of SIMPLE_THREADS, we use non-blocking I/O.  We
+  // simulate blocking by yielding the thread.
+  _socket->SetNonBlocking();
+#endif
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -204,6 +210,12 @@ flush() {
   return do_flush();
 }
 
+/*
+This method is disabled.  We don't provide enough interface to use
+non-blocking I/O effectively at this level, so we shouldn't provide
+this call.  Specifically, we don't provide a way to query whether an
+operation failed because it would have blocked or not.
+
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::set_nonblock
 //       Access: Published
@@ -217,6 +229,7 @@ set_nonblock(bool flag) {
     _socket->SetBlocking();
   }
 }
+*/
 
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::set_linger
@@ -355,7 +368,14 @@ send_datagram(const NetDatagram &datagram, int tcp_header_size) {
     data += datagram.get_message();
     
     int bytes_to_send = data.length();
-    bool okflag = udp->SendTo(data, datagram.get_address().get_addr());
+    Socket_Address addr = datagram.get_address().get_addr();
+    bool okflag = udp->SendTo(data, addr);
+#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
+    while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR) {
+      Thread::force_yield();
+      okflag = udp->SendTo(data, addr);
+    }
+#endif  // SIMPLE_THREADS
     
     if (net_cat.is_debug()) {
       header.verify_datagram(datagram);
@@ -418,7 +438,14 @@ send_raw_datagram(const NetDatagram &datagram) {
     string data = datagram.get_message();
 
     ReMutexHolder holder(_write_mutex);
-    bool okflag = udp->SendTo(data, datagram.get_address().get_addr());
+    Socket_Address addr = datagram.get_address().get_addr();
+    bool okflag = udp->SendTo(data, addr);
+#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
+    while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR) {
+      Thread::force_yield();
+      okflag = udp->SendTo(data, addr);
+    }
+#endif  // SIMPLE_THREADS
     
     if (net_cat.is_spam()) {
       net_cat.spam()
@@ -466,12 +493,31 @@ do_flush() {
   Socket_TCP *tcp;
   DCAST_INTO_R(tcp, _socket, false);
 
-  bool okflag = (tcp->SendData(_queued_data) == (int)_queued_data.size());
+  string sending_data;
+  _queued_data.swap(sending_data);
 
-  _queued_data = string();
   _queued_count = 0;
   _queued_data_start = TrueClock::get_global_ptr()->get_short_time();
 
+  int data_sent = tcp->SendData(sending_data);
+  bool okflag = (data_sent == (int)sending_data.size());
+#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
+  if (!okflag) {
+    int total_sent = 0;
+    if (data_sent > 0) {
+      total_sent += data_sent;
+    }
+    while (!okflag && (data_sent > 0 || tcp->GetLastError() == LOCAL_BLOCKING_ERROR)) {
+      Thread::force_yield();
+      data_sent = tcp->SendData(sending_data.data() + total_sent, sending_data.size() - total_sent);
+      if (data_sent > 0) {
+        total_sent += data_sent;
+      }
+      okflag = (total_sent == (int)sending_data.size());
+    }
+  }
+#endif  // SIMPLE_THREADS
+
   return check_send_error(okflag);
 }
 
@@ -484,6 +530,11 @@ do_flush() {
 bool Connection::
 check_send_error(bool okflag) {
   if (!okflag) {
+    static ConfigVariableBool abort_send_error("abort-send-error", false);
+    if (abort_send_error) {
+      nassertr(false, false);
+    }
+
     // Assume any error means the connection has been reset; tell
     // our manager about it and ignore it.
     if (_manager != (ConnectionManager *)NULL) {

+ 1 - 1
panda/src/net/connection.h

@@ -48,7 +48,7 @@ PUBLISHED:
   bool flush();
 
   // Socket options.
-  void set_nonblock(bool flag);
+  //  void set_nonblock(bool flag);
   void set_linger(bool flag, double time);
   void set_reuse_addr(bool flag);
   void set_keep_alive(bool flag);

+ 10 - 1
panda/src/net/connectionListener.cxx

@@ -57,7 +57,16 @@ process_incoming_data(SocketInfo *sinfo) {
 
   Socket_Address addr;
   Socket_TCP *session = new Socket_TCP;
-  if (!socket->GetIncomingConnection(*session, addr)) {
+
+  bool got_connection = socket->GetIncomingConnection(*session, addr);
+#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
+  while (!got_connection && socket->GetLastError() == LOCAL_BLOCKING_ERROR) {
+    Thread::force_yield();
+    got_connection = socket->GetIncomingConnection(*session, addr);
+  }
+#endif  // SIMPLE_THREADS
+
+  if (!got_connection) {
     net_cat.error()
       << "Error when accepting new connection.\n";
     delete session;

+ 22 - 0
panda/src/net/connectionReader.cxx

@@ -562,6 +562,13 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
     int bytes_read =
       socket->RecvData(buffer + header_bytes_read,
                        _tcp_header_size - header_bytes_read);
+#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
+    while (bytes_read <= 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) {
+      Thread::force_yield();
+      bytes_read = socket->RecvData(buffer + header_bytes_read,
+                                    _tcp_header_size - header_bytes_read);
+    }
+#endif  // SIMPLE_THREADS
 
     if (bytes_read <= 0) {
       // The socket was closed.  Report that and return.
@@ -598,6 +605,15 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
     bytes_read =
       socket->RecvData(buffer, min(read_buffer_size,
                                    (int)(size - datagram.get_length())));
+#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
+    while (bytes_read <= 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) {
+      Thread::force_yield();
+      bytes_read =
+        socket->RecvData(buffer, min(read_buffer_size,
+                                     (int)(size - datagram.get_length())));
+    }
+#endif  // SIMPLE_THREADS
+
     char *dp = buffer;
 
     if (bytes_read <= 0) {
@@ -702,6 +718,12 @@ process_raw_incoming_tcp_data(SocketInfo *sinfo) {
   // Read as many bytes as we can.
   char buffer[read_buffer_size];
   int bytes_read = socket->RecvData(buffer, read_buffer_size);
+#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
+  while (bytes_read <= 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) {
+    Thread::force_yield();
+    bytes_read = socket->RecvData(buffer, read_buffer_size);
+  }
+#endif  // SIMPLE_THREADS
 
   if (bytes_read <= 0) {
     // The socket was closed.  Report that and return.

+ 20 - 26
panda/src/pstatclient/pStatClientImpl.cxx

@@ -374,30 +374,26 @@ send_hello() {
 ////////////////////////////////////////////////////////////////////
 void PStatClientImpl::
 report_new_collectors() {
-  nassertv(_is_connected);
-
-  if (_collectors_reported < _client->_num_collectors) {
-    // Empirically, we determined that you can't send more than about
-    // 1400 collectors at once without exceeding the 64K limit on a
-    // single datagram.  So we limit ourselves here to sending only
-    // half that many.
-    static const int max_collectors_at_once = 700;
-
-    while (_collectors_reported < _client->_num_collectors) {
-      PStatClientControlMessage message;
-      message._type = PStatClientControlMessage::T_define_collectors;
-      int i = 0;
-      while (_collectors_reported < _client->_num_collectors &&
-             i < max_collectors_at_once) {
-        message._collectors.push_back(_client->get_collector_def(_collectors_reported));
-        _collectors_reported++;
-        i++;
-      }
-
-      Datagram datagram;
-      message.encode(datagram);
-      _writer.send(datagram, _tcp_connection);
+  // Empirically, we determined that you can't send more than about
+  // 1400 collectors at once without exceeding the 64K limit on a
+  // single datagram.  So we limit ourselves here to sending only
+  // half that many.
+  static const int max_collectors_at_once = 700;
+  
+  while (_is_connected && _collectors_reported < _client->_num_collectors) {
+    PStatClientControlMessage message;
+    message._type = PStatClientControlMessage::T_define_collectors;
+    int i = 0;
+    while (_collectors_reported < _client->_num_collectors &&
+           i < max_collectors_at_once) {
+      message._collectors.push_back(_client->get_collector_def(_collectors_reported));
+      _collectors_reported++;
+      i++;
     }
+    
+    Datagram datagram;
+    message.encode(datagram);
+    _writer.send(datagram, _tcp_connection);
   }
 }
 
@@ -409,9 +405,7 @@ report_new_collectors() {
 ////////////////////////////////////////////////////////////////////
 void PStatClientImpl::
 report_new_threads() {
-  nassertv(_is_connected);
-
-  if (_threads_reported < _client->_num_threads) {
+  while (_is_connected && _threads_reported < _client->_num_threads) {
     PStatClientControlMessage message;
     message._type = PStatClientControlMessage::T_define_threads;
     message._first_thread_index = _threads_reported;