Переглянути джерело

tcp_header_size, some threading issues

David Rose 16 роки тому
батько
коміт
6e33ac4513

+ 6 - 0
panda/src/downloader/config_downloader.cxx

@@ -106,6 +106,12 @@ ConfigVariableInt http_max_connect_count
           "prevent the code from attempting runaway connections; this limit "
           "should never be reached in practice."));
 
+ConfigVariableInt tcp_header_size
+("tcp-header-size", 2,
+ PRC_DESC("Specifies the number of bytes to use to specify the datagram "
+          "length when writing a datagram on a TCP stream.  This may be "
+          "0, 2, or 4.  The server and client must agree on this value."));
+
 ConfigureFn(config_downloader) {
   init_libdownloader();
 }

+ 2 - 0
panda/src/downloader/config_downloader.h

@@ -47,6 +47,8 @@ extern ConfigVariableInt http_skip_body_size;
 extern ConfigVariableDouble http_idle_timeout;
 extern ConfigVariableInt http_max_connect_count;
 
+extern ConfigVariableInt tcp_header_size;
+
 extern EXPCL_PANDAEXPRESS void init_libdownloader();
 
 #endif

+ 75 - 25
panda/src/downloader/socketStream.I

@@ -13,22 +13,6 @@
 ////////////////////////////////////////////////////////////////////
 
 
-////////////////////////////////////////////////////////////////////
-//     Function: SSReader::Constructor
-//       Access: Public
-//  Description:
-////////////////////////////////////////////////////////////////////
-INLINE SSReader::
-SSReader(istream *stream) : _istream(stream) {
-  _data_expected = 0;
-
-#ifdef SIMULATE_NETWORK_DELAY
-  _delay_active = false;
-  _min_delay = 0.0;
-  _delay_variance = 0.0;
-#endif  // SIMULATE_NETWORK_DELAY
-}
-
 ////////////////////////////////////////////////////////////////////
 //     Function: SSReader::receive_datagram
 //       Access: Published
@@ -38,7 +22,7 @@ SSReader(istream *stream) : _istream(stream) {
 //               the data is not available; otherwise, returns false
 //               only if the socket closes.
 ////////////////////////////////////////////////////////////////////
-bool SSReader::
+INLINE bool SSReader::
 receive_datagram(Datagram &dg) {
 #ifdef SIMULATE_NETWORK_DELAY
   if (_delay_active) {
@@ -59,15 +43,29 @@ receive_datagram(Datagram &dg) {
 }
 
 ////////////////////////////////////////////////////////////////////
-//     Function: SSWriter::Constructor
-//       Access: Public
-//  Description:
+//     Function: SSReader::set_tcp_header_size
+//       Access: Published
+//  Description: Sets the header size for datagrams.  At the present,
+//               legal values for this are 0, 2, or 4; this specifies
+//               the number of bytes to use encode the datagram length
+//               at the start of each TCP datagram.  Sender and
+//               receiver must independently agree on this.
 ////////////////////////////////////////////////////////////////////
-INLINE SSWriter::
-SSWriter(ostream *stream) : _ostream(stream) {
-  _collect_tcp = collect_tcp;
-  _collect_tcp_interval = collect_tcp_interval;
-  _queued_data_start = 0.0;
+INLINE void SSReader::
+set_tcp_header_size(int tcp_header_size) {
+  nassertv(tcp_header_size == 0 || tcp_header_size == 2 || tcp_header_size == 4);
+  _tcp_header_size = tcp_header_size;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: SSReader::get_tcp_header_size
+//       Access: Published
+//  Description: Returns the header size for datagrams.  See
+//               set_tcp_header_size().
+////////////////////////////////////////////////////////////////////
+INLINE int SSReader::
+get_tcp_header_size() const {
+  return _tcp_header_size;
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -135,6 +133,32 @@ get_collect_tcp_interval() const {
   return _collect_tcp_interval;
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: SSWriter::set_tcp_header_size
+//       Access: Published
+//  Description: Sets the header size for datagrams.  At the present,
+//               legal values for this are 0, 2, or 4; this specifies
+//               the number of bytes to use encode the datagram length
+//               at the start of each TCP datagram.  Sender and
+//               receiver must independently agree on this.
+////////////////////////////////////////////////////////////////////
+INLINE void SSWriter::
+set_tcp_header_size(int tcp_header_size) {
+  nassertv(tcp_header_size == 0 || tcp_header_size == 2 || tcp_header_size == 4);
+  _tcp_header_size = tcp_header_size;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: SSWriter::get_tcp_header_size
+//       Access: Published
+//  Description: Returns the header size for datagrams.  See
+//               set_tcp_header_size().
+////////////////////////////////////////////////////////////////////
+INLINE int SSWriter::
+get_tcp_header_size() const {
+  return _tcp_header_size;
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: SSWriter::consider_flush
 //       Access: Published
@@ -214,6 +238,32 @@ INLINE SocketStream::
 SocketStream(streambuf *buf) : iostream(buf), SSReader(this), SSWriter(this) {
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: SocketStream::set_tcp_header_size
+//       Access: Published
+//  Description: Sets the header size for datagrams.  At the present,
+//               legal values for this are 0, 2, or 4; this specifies
+//               the number of bytes to use encode the datagram length
+//               at the start of each TCP datagram.  Sender and
+//               receiver must independently agree on this.
+////////////////////////////////////////////////////////////////////
+INLINE void SocketStream::
+set_tcp_header_size(int tcp_header_size) {
+  SSReader::set_tcp_header_size(tcp_header_size);
+  SSWriter::set_tcp_header_size(tcp_header_size);
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: SocketStream::get_tcp_header_size
+//       Access: Published
+//  Description: Returns the header size for datagrams.  See
+//               set_tcp_header_size().
+////////////////////////////////////////////////////////////////////
+INLINE int SocketStream::
+get_tcp_header_size() const {
+  return SSReader::get_tcp_header_size();
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: SocketStream::flush
 //       Access: Published

+ 46 - 4
panda/src/downloader/socketStream.cxx

@@ -16,9 +16,27 @@
 #include "datagram.h"
 #include "datagramIterator.h"
 #include "httpChannel.h"
+#include "config_downloader.h"
 
 #ifdef HAVE_OPENSSL
 
+////////////////////////////////////////////////////////////////////
+//     Function: SSReader::Constructor
+//       Access: Public
+//  Description:
+////////////////////////////////////////////////////////////////////
+SSReader::
+SSReader(istream *stream) : _istream(stream) {
+  _data_expected = 0;
+  _tcp_header_size = tcp_header_size;
+
+#ifdef SIMULATE_NETWORK_DELAY
+  _delay_active = false;
+  _min_delay = 0.0;
+  _delay_variance = 0.0;
+#endif  // SIMULATE_NETWORK_DELAY
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: SSReader::Destructor
 //       Access: Public, Virtual
@@ -39,9 +57,12 @@ SSReader::
 ////////////////////////////////////////////////////////////////////
 bool SSReader::
 do_receive_datagram(Datagram &dg) {
+  if (_tcp_header_size == 0) {
+    _data_expected = _data_so_far.length();
+  }
   if (_data_expected == 0) {
     // Read the first two bytes: the datagram length.
-    while (_data_so_far.length() < 2) {
+    while (_data_so_far.length() < _tcp_header_size) {
       int ch = _istream->get();
       if (_istream->eof() || _istream->fail()) {
         _istream->clear();
@@ -52,8 +73,12 @@ do_receive_datagram(Datagram &dg) {
 
     Datagram header(_data_so_far);
     DatagramIterator di(header);
-    _data_expected = di.get_uint16();
-    _data_so_far = string();
+    if (_tcp_header_size == 2) {
+      _data_expected = di.get_uint16();
+    } else if (_tcp_header_size == 4) {
+      _data_expected = di.get_uint32();
+    }
+    _data_so_far = _data_so_far.substr(_tcp_header_size);
 
     if (_data_expected == 0) {
       // Empty datagram.
@@ -186,6 +211,19 @@ get_delayed(Datagram &datagram) {
 }
 #endif  // SIMULATE_NETWORK_DELAY
 
+////////////////////////////////////////////////////////////////////
+//     Function: SSWriter::Constructor
+//       Access: Public
+//  Description:
+////////////////////////////////////////////////////////////////////
+SSWriter::
+SSWriter(ostream *stream) : _ostream(stream) {
+  _collect_tcp = collect_tcp;
+  _collect_tcp_interval = collect_tcp_interval;
+  _queued_data_start = 0.0;
+  _tcp_header_size = tcp_header_size;
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: SSWriter::Destructor
 //       Access: Public, Virtual
@@ -207,7 +245,11 @@ SSWriter::
 bool SSWriter::
 send_datagram(const Datagram &dg) {
   Datagram header;
-  header.add_uint16(dg.get_length());
+  if (_tcp_header_size == 2) {
+    header.add_uint16(dg.get_length());
+  } else if (_tcp_header_size == 4) {
+    header.add_uint32(dg.get_length());
+  }
 
   // These two writes don't generate two socket calls, because the
   // socket stream is always buffered.

+ 13 - 2
panda/src/downloader/socketStream.h

@@ -40,7 +40,7 @@ class HTTPChannel;
 ////////////////////////////////////////////////////////////////////
 class EXPCL_PANDAEXPRESS SSReader {
 public:
-  INLINE SSReader(istream *stream);
+  SSReader(istream *stream);
   virtual ~SSReader();
 
 PUBLISHED:
@@ -49,12 +49,16 @@ PUBLISHED:
   virtual bool is_closed() = 0;
   virtual void close() = 0;
 
+  INLINE void set_tcp_header_size(int tcp_header_size);
+  INLINE int get_tcp_header_size() const;
+
 private:
   bool do_receive_datagram(Datagram &dg);
 
   istream *_istream;
   size_t _data_expected;
   string _data_so_far;
+  int _tcp_header_size;
 
 #ifdef SIMULATE_NETWORK_DELAY
 PUBLISHED:
@@ -88,7 +92,7 @@ private:
 ////////////////////////////////////////////////////////////////////
 class EXPCL_PANDAEXPRESS SSWriter {
 public:
-  INLINE SSWriter(ostream *stream);
+  SSWriter(ostream *stream);
   virtual ~SSWriter();
 
 PUBLISHED:
@@ -102,6 +106,9 @@ PUBLISHED:
   INLINE void set_collect_tcp_interval(double interval);
   INLINE double get_collect_tcp_interval() const;
 
+  INLINE void set_tcp_header_size(int tcp_header_size);
+  INLINE int get_tcp_header_size() const;
+
   INLINE bool consider_flush();
   INLINE bool flush();
 
@@ -110,6 +117,7 @@ private:
   bool _collect_tcp;
   double _collect_tcp_interval;
   double _queued_data_start;
+  int _tcp_header_size;
 };
 
 ////////////////////////////////////////////////////////////////////
@@ -177,6 +185,9 @@ PUBLISHED:
   virtual bool is_closed() = 0;
   virtual void close() = 0;
 
+  INLINE void set_tcp_header_size(int tcp_header_size);
+  INLINE int get_tcp_header_size() const;
+
   INLINE bool flush();
 };
 

+ 1 - 1
panda/src/net/Sources.pp

@@ -7,7 +7,7 @@
 #begin lib_target
   #define TARGET net
   #define LOCAL_LIBS \
-    express pandabase nativenet pipeline
+    express downloader pandabase nativenet pipeline
 
   #define COMBINED_SOURCES $[TARGET]_composite1.cxx $[TARGET]_composite2.cxx
 

+ 5 - 3
panda/src/net/connection.cxx

@@ -379,7 +379,7 @@ send_datagram(const NetDatagram &datagram, int tcp_header_size) {
 
     bool okflag = udp->SendTo(data, addr);
 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
-    while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR) {
+    while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR && udp->Active()) {
       Thread::force_yield();
       okflag = udp->SendTo(data, addr);
     }
@@ -445,7 +445,7 @@ send_raw_datagram(const NetDatagram &datagram) {
     Socket_Address addr = datagram.get_address().get_addr();
     bool okflag = udp->SendTo(data, addr);
 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
-    while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR) {
+    while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR && udp->Active()) {
       Thread::force_yield();
       okflag = udp->SendTo(data, addr);
     }
@@ -511,7 +511,9 @@ do_flush() {
     if (data_sent > 0) {
       total_sent += data_sent;
     }
-    while (!okflag && (data_sent > 0 || tcp->GetLastError() == LOCAL_BLOCKING_ERROR)) {
+    double last_report = 0;
+    while (!okflag && tcp->Active() &&
+           (data_sent > 0 || tcp->GetLastError() == LOCAL_BLOCKING_ERROR)) {
       Thread::force_yield();
       data_sent = tcp->SendData(sending_data.data() + total_sent, sending_data.size() - total_sent);
       if (data_sent > 0) {

+ 11 - 4
panda/src/net/connectionReader.cxx

@@ -25,6 +25,7 @@
 #include "lightMutexHolder.h"
 #include "pnotify.h"
 #include "atomicAdjust.h"
+#include "config_downloader.h"
 
 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
 
@@ -112,7 +113,7 @@ ConnectionReader(ConnectionManager *manager, int num_threads) :
   }
 
   _raw_mode = false;
-  _tcp_header_size = datagram_tcp16_header_size;
+  _tcp_header_size = tcp_header_size;
   _polling = (num_threads <= 0);
 
   _shutdown = false;
@@ -602,7 +603,8 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
       socket->RecvData(buffer + header_bytes_read,
                        _tcp_header_size - header_bytes_read);
 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
-    while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) {
+    while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
+           socket->Active()) {
       Thread::force_yield();
       bytes_read = socket->RecvData(buffer + header_bytes_read,
                                     _tcp_header_size - header_bytes_read);
@@ -645,7 +647,8 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
       socket->RecvData(buffer, min(read_buffer_size,
                                    (int)(size - datagram.get_length())));
 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
-    while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) {
+    while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
+           socket->Active()) {
       Thread::force_yield();
       bytes_read =
         socket->RecvData(buffer, min(read_buffer_size,
@@ -779,7 +782,8 @@ process_raw_incoming_tcp_data(SocketInfo *sinfo) {
   char buffer[read_buffer_size];
   int bytes_read = socket->RecvData(buffer, read_buffer_size);
 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
-  while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) {
+  while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR && 
+         socket->Active()) {
     Thread::force_yield();
     bytes_read = socket->RecvData(buffer, read_buffer_size);
   }
@@ -837,6 +841,9 @@ thread_run(int thread_index) {
       get_next_available_socket(false, thread_index);
     if (sinfo != (SocketInfo *)NULL) {
       process_incoming_data(sinfo);
+      Thread::consider_yield();
+    } else {
+      Thread::force_yield();
     }
   }
 }

+ 3 - 1
panda/src/net/connectionWriter.cxx

@@ -19,6 +19,7 @@
 #include "socket_tcp.h"
 #include "socket_udp.h"
 #include "pnotify.h"
+#include "config_downloader.h"
 
 ////////////////////////////////////////////////////////////////////
 //     Function: ConnectionWriter::WriterThread::Constructor
@@ -70,7 +71,7 @@ ConnectionWriter(ConnectionManager *manager, int num_threads) :
   }
 
   _raw_mode = false;
-  _tcp_header_size = datagram_tcp16_header_size;
+  _tcp_header_size = tcp_header_size;
   _immediate = (num_threads <= 0);
 
   int i;
@@ -353,6 +354,7 @@ thread_run(int thread_index) {
     } else {
       datagram.get_connection()->send_datagram(datagram, _tcp_header_size);
     }
+    Thread::consider_yield();
   }
 }