Browse Source

simulate_network_delay for http too

David Rose 20 years ago
parent
commit
ee7f0dceb2

+ 1 - 1
direct/src/distributed/Sources.pp

@@ -8,7 +8,7 @@
   #define LOCAL_LIBS \
   #define LOCAL_LIBS \
     directbase dcparser
     directbase dcparser
   #define OTHER_LIBS \
   #define OTHER_LIBS \
-    downloader:c panda:m express:c pandaexpress:m \
+    event:c downloader:c panda:m express:c pandaexpress:m \
     interrogatedb:c dconfig:c dtoolconfig:m \
     interrogatedb:c dconfig:c dtoolconfig:m \
     dtoolutil:c dtoolbase:c dtool:m
     dtoolutil:c dtoolbase:c dtool:m
   #if $[and $[HAVE_NET],$[HAVE_NSPR]] \
   #if $[and $[HAVE_NET],$[HAVE_NSPR]] \

+ 72 - 0
direct/src/distributed/cConnectionRepository.cxx

@@ -90,6 +90,25 @@ set_connection_http(HTTPChannel *channel) {
   disconnect();
   disconnect();
   nassertv(channel->is_connection_ready());
   nassertv(channel->is_connection_ready());
   _http_conn = channel->get_connection();
   _http_conn = channel->get_connection();
+#ifdef SIMULATE_NETWORK_DELAY
+  if (min_lag != 0.0 || max_lag != 0.0) {
+    _http_conn->start_delay(min_lag, max_lag);
+  }
+#endif
+}
+#endif  // HAVE_SSL
+
+#ifdef HAVE_SSL
+////////////////////////////////////////////////////////////////////
+//     Function: CConnectionRepository::get_stream
+//       Access: Published
+//  Description: Returns the SocketStream that internally represents
+//               the already-established HTTP connection.  Returns
+//               NULL if there is no current HTTP connection.
+////////////////////////////////////////////////////////////////////
+SocketStream *CConnectionRepository::
+get_stream() {
+  return _http_conn;
 }
 }
 #endif  // HAVE_SSL
 #endif  // HAVE_SSL
 
 
@@ -120,6 +139,59 @@ try_connect_nspr(const URLSpec &url) {
 }
 }
 #endif  // HAVE_NSPR
 #endif  // HAVE_NSPR
 
 
+#ifdef SIMULATE_NETWORK_DELAY
+////////////////////////////////////////////////////////////////////
+//     Function: CConnectionRepository::start_delay
+//       Access: Published
+//  Description: Enables a simulated network latency.  All datagrams
+//               received from this point on will be held for a random
+//               interval of least min_delay seconds, and no more than
+//               max_delay seconds, before being visible.  It is as if
+//               datagrams suddenly took much longer to arrive.
+//
+//               This should *only* be called if the underlying socket
+//               is non-blocking.  If you call this on a blocking
+//               socket, it will force all datagrams to be held up
+//               until the socket closes.
+////////////////////////////////////////////////////////////////////
+void CConnectionRepository::
+start_delay(double min_delay, double max_delay) {
+  if (min_delay != 0.0 || max_delay != 0.0) {
+#ifdef HAVE_NSPR
+    _qcr.start_delay(min_delay, max_delay);
+#endif  // HAVE_NSPR
+#ifdef HAVE_SSL
+    if (_http_conn != (SocketStream *)NULL) {
+      _http_conn->start_delay(min_delay, max_delay);
+    }
+#endif  // HAVE_SSL
+  } else {
+    stop_delay();
+  }
+}
+#endif  // SIMULATE_NETWORK_DELAY
+
+#ifdef SIMULATE_NETWORK_DELAY
+////////////////////////////////////////////////////////////////////
+//     Function: CConnectionRepository::stop_delay
+//       Access: Published
+//  Description: Disables the simulated network latency started by a
+//               previous call to start_delay().  Datagrams will once
+//               again be visible as soon as they are received.
+////////////////////////////////////////////////////////////////////
+void CConnectionRepository::
+stop_delay() {
+#ifdef HAVE_NSPR
+  _qcr.stop_delay();
+#endif  // HAVE_NSPR
+#ifdef HAVE_SSL
+  if (_http_conn != (SocketStream *)NULL) {
+    _http_conn->stop_delay();
+  }
+#endif  // HAVE_SSL
+}
+#endif  // SIMULATE_NETWORK_DELAY
+
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 //     Function: CConnectionRepository::check_datagram
 //     Function: CConnectionRepository::check_datagram
 //       Access: Published
 //       Access: Published

+ 6 - 0
direct/src/distributed/cConnectionRepository.h

@@ -69,6 +69,7 @@ PUBLISHED:
 
 
 #ifdef HAVE_SSL
 #ifdef HAVE_SSL
   void set_connection_http(HTTPChannel *channel);
   void set_connection_http(HTTPChannel *channel);
+  SocketStream *get_stream();
 #endif
 #endif
 #ifdef HAVE_NSPR
 #ifdef HAVE_NSPR
   bool try_connect_nspr(const URLSpec &url);
   bool try_connect_nspr(const URLSpec &url);
@@ -78,6 +79,11 @@ PUBLISHED:
   INLINE QueuedConnectionReader &get_qcr();
   INLINE QueuedConnectionReader &get_qcr();
 #endif
 #endif
 
 
+#ifdef SIMULATE_NETWORK_DELAY
+  void start_delay(double min_delay, double max_delay);
+  void stop_delay();
+#endif
+
   bool check_datagram();
   bool check_datagram();
   INLINE void get_datagram(Datagram &dg);
   INLINE void get_datagram(Datagram &dg);
   INLINE void get_datagram_iterator(DatagramIterator &di);
   INLINE void get_datagram_iterator(DatagramIterator &di);

+ 2 - 2
direct/src/showbase/ShowBase.py

@@ -509,7 +509,7 @@ class ShowBase(DirectObject.DirectObject):
             return self.win.isValid()
             return self.win.isValid()
         return 0
         return 0
 
 
-    def openMainWindow(self):
+    def openMainWindow(self, *args, **kw):
         """
         """
         Creates the initial, main window for the application, and sets
         Creates the initial, main window for the application, and sets
         up the mouse and render2d structures appropriately for it.  If
         up the mouse and render2d structures appropriately for it.  If
@@ -534,7 +534,7 @@ class ShowBase(DirectObject.DirectObject):
             self.closeWindow(self.win)
             self.closeWindow(self.win)
 
 
         # Open a new window.
         # Open a new window.
-        self.openWindow()
+        self.openWindow(*args, **kw)
         if self.win == None:
         if self.win == None:
             self.win = oldWin
             self.win = oldWin
             self.winList.append(oldWin)
             self.winList.append(oldWin)

+ 7 - 1
dtool/Config.pp

@@ -298,7 +298,13 @@
 // to help track memory leaks, and also report total memory usage on
 // to help track memory leaks, and also report total memory usage on
 // PStats.  There is some small overhead for having this ability
 // PStats.  There is some small overhead for having this ability
 // available, even if it is unused.
 // available, even if it is unused.
-#defer DO_MEMORY_USAGE $[<= $[OPTIMIZE], 2]
+#defer DO_MEMORY_USAGE $[<= $[OPTIMIZE], 3]
+
+// This option compiles in support for simulating network delay via
+// the min-lag and max-lag prc variables.  It adds a tiny bit of
+// overhead even when it is not activated, so it is typically enabled
+// only in a development build.
+#defer SIMULATE_NETWORK_DELAY $[<= $[OPTIMIZE], 3]
 
 
 // Do you want to compile in support for pipelining?  This enables
 // Do you want to compile in support for pipelining?  This enables
 // setting and accessing multiple different copies of frame-specific
 // setting and accessing multiple different copies of frame-specific

+ 3 - 0
dtool/LocalSetup.pp

@@ -223,6 +223,9 @@ $[cdefine TRACK_IN_INTERPRETER]
 /* Define if we want to enable track-memory-usage.  */
 /* Define if we want to enable track-memory-usage.  */
 $[cdefine DO_MEMORY_USAGE]
 $[cdefine DO_MEMORY_USAGE]
 
 
+/* Define if we want to enable min-lag and max-lag.  */
+$[cdefine SIMULATE_NETWORK_DELAY]
+
 /* Define if we want to compile in support for pipelining.  */
 /* Define if we want to compile in support for pipelining.  */
 $[cdefine DO_PIPELINING]
 $[cdefine DO_PIPELINING]
 
 

+ 0 - 2
panda/src/downloader/bioStreamPtr.h

@@ -46,8 +46,6 @@ public:
   INLINE void set_stream(BioStream *stream);
   INLINE void set_stream(BioStream *stream);
   INLINE BioStream *get_stream() const;
   INLINE BioStream *get_stream() const;
 
 
-  bool connect() const;
-  
 private:
 private:
   BioStream *_stream;
   BioStream *_stream;
 };
 };

+ 75 - 106
panda/src/downloader/socketStream.I

@@ -18,29 +18,64 @@
 
 
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//     Function: ISocketStream::Constructor
+//     Function: SSReader::Constructor
 //       Access: Public
 //       Access: Public
 //  Description:
 //  Description:
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-INLINE ISocketStream::
-ISocketStream(streambuf *buf) : istream(buf) {
+INLINE SSReader::
+SSReader(istream *stream) : _istream(stream) {
   _data_expected = 0;
   _data_expected = 0;
+
+#ifdef SIMULATE_NETWORK_DELAY
+  _delay_active = false;
+  _min_delay = 0.0;
+  _delay_variance = 0.0;
+#endif  // SIMULATE_NETWORK_DELAY
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//     Function: OSocketStream::Constructor
+//     Function: SSReader::receive_datagram
+//       Access: Published
+//  Description: Receives a datagram over the socket by expecting a
+//               little-endian 16-bit byte count as a prefix.  If the
+//               socket stream is non-blocking, may return false if
+//               the data is not available; otherwise, returns false
+//               only if the socket closes.
+////////////////////////////////////////////////////////////////////
+bool SSReader::
+receive_datagram(Datagram &dg) {
+#ifdef SIMULATE_NETWORK_DELAY
+  if (_delay_active) {
+    while (do_receive_datagram(dg)) {
+      delay_datagram(dg);
+    }
+    return get_delayed(dg);
+  }
+
+  // Pick up any datagrams that might have been leftover in the queue
+  // when we disabled the delay.
+  if (get_delayed(dg)) {
+    return true;
+  }
+#endif  // SIMULATE_NETWORK_DELAY
+
+  return do_receive_datagram(dg);
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: SSWriter::Constructor
 //       Access: Public
 //       Access: Public
 //  Description:
 //  Description:
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-INLINE OSocketStream::
-OSocketStream(streambuf *buf) : ostream(buf) {
+INLINE SSWriter::
+SSWriter(ostream *stream) : _ostream(stream) {
   _collect_tcp = collect_tcp;
   _collect_tcp = collect_tcp;
   _collect_tcp_interval = collect_tcp_interval;
   _collect_tcp_interval = collect_tcp_interval;
   _queued_data_start = 0.0;
   _queued_data_start = 0.0;
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//     Function: OSocketStream::set_collect_tcp
+//     Function: SSWriter::set_collect_tcp
 //       Access: Published
 //       Access: Published
 //  Description: Enables or disables "collect-tcp" mode.  In this
 //  Description: Enables or disables "collect-tcp" mode.  In this
 //               mode, individual TCP packets are not sent
 //               mode, individual TCP packets are not sent
@@ -60,24 +95,24 @@ OSocketStream(streambuf *buf) : ostream(buf) {
 //               periodically call consider_flush() to flush the queue
 //               periodically call consider_flush() to flush the queue
 //               if no packets have been sent recently.
 //               if no packets have been sent recently.
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-INLINE void OSocketStream::
+INLINE void SSWriter::
 set_collect_tcp(bool collect_tcp) {
 set_collect_tcp(bool collect_tcp) {
   _collect_tcp = collect_tcp;
   _collect_tcp = collect_tcp;
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//     Function: OSocketStream::get_collect_tcp
+//     Function: SSWriter::get_collect_tcp
 //       Access: Published
 //       Access: Published
 //  Description: Returns the current setting of "collect-tcp" mode.
 //  Description: Returns the current setting of "collect-tcp" mode.
 //               See set_collect_tcp().
 //               See set_collect_tcp().
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-INLINE bool OSocketStream::
+INLINE bool SSWriter::
 get_collect_tcp() const {
 get_collect_tcp() const {
   return _collect_tcp;
   return _collect_tcp;
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//     Function: OSocketStream::set_collect_tcp_interval
+//     Function: SSWriter::set_collect_tcp_interval
 //       Access: Published
 //       Access: Published
 //  Description: Specifies the interval in time, in seconds, for which
 //  Description: Specifies the interval in time, in seconds, for which
 //               to hold TCP packets before sending all of the
 //               to hold TCP packets before sending all of the
@@ -85,13 +120,13 @@ get_collect_tcp() const {
 //               meaning if "collect-tcp" mode is enabled; see
 //               meaning if "collect-tcp" mode is enabled; see
 //               set_collect_tcp().
 //               set_collect_tcp().
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-INLINE void OSocketStream::
+INLINE void SSWriter::
 set_collect_tcp_interval(double interval) {
 set_collect_tcp_interval(double interval) {
   _collect_tcp_interval = interval;
   _collect_tcp_interval = interval;
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//     Function: OSocketStream::get_collect_tcp_interval
+//     Function: SSWriter::get_collect_tcp_interval
 //       Access: Published
 //       Access: Published
 //  Description: Returns the interval in time, in seconds, for which
 //  Description: Returns the interval in time, in seconds, for which
 //               to hold TCP packets before sending all of the
 //               to hold TCP packets before sending all of the
@@ -99,19 +134,19 @@ set_collect_tcp_interval(double interval) {
 //               meaning if "collect-tcp" mode is enabled; see
 //               meaning if "collect-tcp" mode is enabled; see
 //               set_collect_tcp().
 //               set_collect_tcp().
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-INLINE double OSocketStream::
+INLINE double SSWriter::
 get_collect_tcp_interval() const {
 get_collect_tcp_interval() const {
   return _collect_tcp_interval;
   return _collect_tcp_interval;
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//     Function: OSocketStream::consider_flush
+//     Function: SSWriter::consider_flush
 //       Access: Published
 //       Access: Published
 //  Description: Sends the most recently queued data if enough time
 //  Description: Sends the most recently queued data if enough time
 //               has elapsed.  This only has meaning if
 //               has elapsed.  This only has meaning if
 //               set_collect_tcp() has been set to true.
 //               set_collect_tcp() has been set to true.
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-INLINE bool OSocketStream::
+INLINE bool SSWriter::
 consider_flush() {
 consider_flush() {
   if (!_collect_tcp) {
   if (!_collect_tcp) {
     return flush();
     return flush();
@@ -130,120 +165,56 @@ consider_flush() {
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//     Function: OSocketStream::flush
+//     Function: SSWriter::flush
 //       Access: Published
 //       Access: Published
 //  Description: Sends the most recently queued data now.  This only
 //  Description: Sends the most recently queued data now.  This only
 //               has meaning if set_collect_tcp() has been set to
 //               has meaning if set_collect_tcp() has been set to
 //               true.
 //               true.
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-INLINE bool OSocketStream::
+INLINE bool SSWriter::
 flush() {
 flush() {
-  ostream::flush();
+  _ostream->flush();
   _queued_data_start = ClockObject::get_global_clock()->get_real_time();
   _queued_data_start = ClockObject::get_global_clock()->get_real_time();
   return !is_closed();
   return !is_closed();
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//     Function: SocketStream::Constructor
+//     Function: ISocketStream::Constructor
 //       Access: Public
 //       Access: Public
 //  Description:
 //  Description:
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-INLINE SocketStream::
-SocketStream(streambuf *buf) : iostream(buf) {
-  _data_expected = 0;
-  _collect_tcp = collect_tcp;
-  _collect_tcp_interval = collect_tcp_interval;
-  _queued_data_start = 0.0;
-}
-
-////////////////////////////////////////////////////////////////////
-//     Function: SocketStream::set_collect_tcp
-//       Access: Published
-//  Description: Enables or disables "collect-tcp" mode.  In this
-//               mode, individual TCP packets are not sent
-//               immediately, but rather they are collected together
-//               and accumulated to be sent periodically as one larger
-//               TCP packet.  This cuts down on overhead from the
-//               TCP/IP protocol, especially if many small packets
-//               need to be sent on the same connection, but it
-//               introduces additional latency (since packets must be
-//               held before they can be sent).
-//
-//               See set_collect_tcp_interval() to specify the
-//               interval of time for which to hold packets before
-//               sending them.
-//
-//               If you enable this mode, you may also need to
-//               periodically call consider_flush() to flush the queue
-//               if no packets have been sent recently.
-////////////////////////////////////////////////////////////////////
-INLINE void SocketStream::
-set_collect_tcp(bool collect_tcp) {
-  _collect_tcp = collect_tcp;
-}
-
-////////////////////////////////////////////////////////////////////
-//     Function: SocketStream::get_collect_tcp
-//       Access: Published
-//  Description: Returns the current setting of "collect-tcp" mode.
-//               See set_collect_tcp().
-////////////////////////////////////////////////////////////////////
-INLINE bool SocketStream::
-get_collect_tcp() const {
-  return _collect_tcp;
+INLINE ISocketStream::
+ISocketStream(streambuf *buf) : istream(buf), SSReader(this) {
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//     Function: SocketStream::set_collect_tcp_interval
-//       Access: Published
-//  Description: Specifies the interval in time, in seconds, for which
-//               to hold TCP packets before sending all of the
-//               recently received packets at once.  This only has
-//               meaning if "collect-tcp" mode is enabled; see
-//               set_collect_tcp().
+//     Function: OSocketStream::Constructor
+//       Access: Public
+//  Description:
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-INLINE void SocketStream::
-set_collect_tcp_interval(double interval) {
-  _collect_tcp_interval = interval;
+INLINE OSocketStream::
+OSocketStream(streambuf *buf) : ostream(buf), SSWriter(this) {
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//     Function: SocketStream::get_collect_tcp_interval
+//     Function: OSocketStream::flush
 //       Access: Published
 //       Access: Published
-//  Description: Returns the interval in time, in seconds, for which
-//               to hold TCP packets before sending all of the
-//               recently received packets at once.  This only has
-//               meaning if "collect-tcp" mode is enabled; see
-//               set_collect_tcp().
+//  Description: Sends the most recently queued data now.  This only
+//               has meaning if set_collect_tcp() has been set to
+//               true.
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-INLINE double SocketStream::
-get_collect_tcp_interval() const {
-  return _collect_tcp_interval;
+INLINE bool OSocketStream::
+flush() {
+  return SSWriter::flush();
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//     Function: SocketStream::consider_flush
-//       Access: Published
-//  Description: Sends the most recently queued data if enough time
-//               has elapsed.  This only has meaning if
-//               set_collect_tcp() has been set to true.
+//     Function: SocketStream::Constructor
+//       Access: Public
+//  Description:
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-INLINE bool SocketStream::
-consider_flush() {
-  if (!_collect_tcp) {
-    return flush();
-
-  } else {
-    double elapsed = 
-      ClockObject::get_global_clock()->get_real_time() - _queued_data_start;
-    // If the elapsed time is negative, someone must have reset the
-    // clock back, so just go ahead and flush.
-    if (elapsed < 0.0 || elapsed >= _collect_tcp_interval) {
-      return flush();
-    }
-  }
-
-  return true;
+INLINE SocketStream::
+SocketStream(streambuf *buf) : iostream(buf), SSReader(this), SSWriter(this) {
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
@@ -255,7 +226,5 @@ consider_flush() {
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 INLINE bool SocketStream::
 INLINE bool SocketStream::
 flush() {
 flush() {
-  iostream::flush();
-  _queued_data_start = ClockObject::get_global_clock()->get_real_time();
-  return !is_closed();
+  return SSWriter::flush();
 }
 }

+ 109 - 70
panda/src/downloader/socketStream.cxx

@@ -23,22 +23,31 @@
 #ifdef HAVE_SSL
 #ifdef HAVE_SSL
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//     Function: ISocketStream::receive_datagram
-//       Access: Public
+//     Function: SSReader::Destructor
+//       Access: Public, Virtual
+//  Description: 
+////////////////////////////////////////////////////////////////////
+SSReader::
+~SSReader() {
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: SSReader::do_receive_datagram
+//       Access: Private
 //  Description: Receives a datagram over the socket by expecting a
 //  Description: Receives a datagram over the socket by expecting a
 //               little-endian 16-bit byte count as a prefix.  If the
 //               little-endian 16-bit byte count as a prefix.  If the
 //               socket stream is non-blocking, may return false if
 //               socket stream is non-blocking, may return false if
 //               the data is not available; otherwise, returns false
 //               the data is not available; otherwise, returns false
 //               only if the socket closes.
 //               only if the socket closes.
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-bool ISocketStream::
-receive_datagram(Datagram &dg) {
+bool SSReader::
+do_receive_datagram(Datagram &dg) {
   if (_data_expected == 0) {
   if (_data_expected == 0) {
     // Read the first two bytes: the datagram length.
     // Read the first two bytes: the datagram length.
     while (_data_so_far.length() < 2) {
     while (_data_so_far.length() < 2) {
-      int ch = get();
-      if (eof()) {
-        clear();
+      int ch = _istream->get();
+      if (_istream->eof()) {
+        _istream->clear();
         return false;
         return false;
       }
       }
       _data_so_far += (char)ch;
       _data_so_far += (char)ch;
@@ -58,9 +67,9 @@ receive_datagram(Datagram &dg) {
 
 
   // Read the next n bytes until the datagram is filled.
   // Read the next n bytes until the datagram is filled.
   while (_data_so_far.length() < _data_expected) {
   while (_data_so_far.length() < _data_expected) {
-    int ch = get();
-    if (eof()) {
-      clear();
+    int ch = _istream->get();
+    if (_istream->eof()) {
+      _istream->clear();
       return false;
       return false;
     }
     }
     _data_so_far += (char)ch;
     _data_so_far += (char)ch;
@@ -71,83 +80,111 @@ receive_datagram(Datagram &dg) {
 
 
   _data_expected = 0;
   _data_expected = 0;
   _data_so_far = string();
   _data_so_far = string();
+
   return true;
   return true;
 }
 }
 
 
+#ifdef SIMULATE_NETWORK_DELAY
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//     Function: OSocketStream::send_datagram
-//       Access: Public
-//  Description: Transmits the indicated datagram over the socket by
-//               prepending it with a little-endian 16-bit byte count.
-//               Does not return until the data is sent or the
-//               connection is closed, even if the socket stream is
-//               non-blocking.
+//     Function: SSReader::start_delay
+//       Access: Published
+//  Description: Enables a simulated network latency.  All datagrams
+//               received from this point on will be held for a random
+//               interval of least min_delay seconds, and no more than
+//               max_delay seconds, before being visible.  It is as if
+//               datagrams suddenly took much longer to arrive.
+//
+//               This should *only* be called if the underlying socket
+//               is non-blocking.  If you call this on a blocking
+//               socket, it will force all datagrams to be held up
+//               until the socket closes.
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-bool OSocketStream::
-send_datagram(const Datagram &dg) {
-  Datagram header;
-  header.add_uint16(dg.get_length());
-  write((const char *)header.get_data(), header.get_length());
-  write((const char *)dg.get_data(), dg.get_length());
-  flush();
-
-  return !is_closed();
+void SSReader::
+start_delay(double min_delay, double max_delay) {
+  _min_delay = min_delay;
+  _delay_variance = max(max_delay - min_delay, 0.0);
+  _delay_active = true;
 }
 }
+#endif  // SIMULATE_NETWORK_DELAY
 
 
+#ifdef SIMULATE_NETWORK_DELAY
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//     Function: SocketStream::receive_datagram
-//       Access: Public
-//  Description: Receives a datagram over the socket by expecting a
-//               little-endian 16-bit byte count as a prefix.  If the
-//               socket stream is non-blocking, may return false if
-//               the data is not available; otherwise, returns false
-//               only if the socket closes.
+//     Function: SSReader::stop_delay
+//       Access: Published
+//  Description: Disables the simulated network latency started by a
+//               previous call to start_delay().  Datagrams will once
+//               again be visible as soon as they are received.
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-bool SocketStream::
-receive_datagram(Datagram &dg) {
-  if (_data_expected == 0) {
-    // Read the first two bytes: the datagram length.
-    while (_data_so_far.length() < 2) {
-      int ch = get();
-      if (eof()) {
-        clear();
-        return false;
-      }
-      _data_so_far += (char)ch;
-    }
-
-    Datagram header(_data_so_far);
-    DatagramIterator di(header);
-    _data_expected = di.get_uint16();
-    _data_so_far = string();
+void SSReader::
+stop_delay() {
+  _delay_active = false;
+}
+#endif  // SIMULATE_NETWORK_DELAY
 
 
-    if (_data_expected == 0) {
-      // Empty datagram.
-      dg.clear();
-      return true;
-    }
+#ifdef SIMULATE_NETWORK_DELAY
+////////////////////////////////////////////////////////////////////
+//     Function: SSReader::delay_datagram
+//       Access: Private
+//  Description: Adds the datagram to the delay queue for a random
+//               time interval.
+////////////////////////////////////////////////////////////////////
+void SSReader::
+delay_datagram(const Datagram &datagram) {
+  nassertv(_delay_active);
+
+  double now = ClockObject::get_global_clock()->get_frame_time();
+  double reveal_time = now + _min_delay;
+  
+  if (_delay_variance > 0.0) {
+    reveal_time += _delay_variance * ((double)rand() / (double)RAND_MAX);
   }
   }
+  _delayed.push_back(DelayedDatagram());
+  DelayedDatagram &dd = _delayed.back();
+  dd._reveal_time = reveal_time;
+  dd._datagram = datagram;
+}
+#endif  // SIMULATE_NETWORK_DELAY
 
 
-  // Read the next n bytes until the datagram is filled.
-  while (_data_so_far.length() < _data_expected) {
-    int ch = get();
-    if (eof()) {
-      clear();
+#ifdef SIMULATE_NETWORK_DELAY
+////////////////////////////////////////////////////////////////////
+//     Function: SSReader::get_delayed
+//       Access: Private
+//  Description: Checks the delayed queue for any now available
+//               datagrams.  If any are available, returns true and
+//               fills datagram with its value.
+////////////////////////////////////////////////////////////////////
+bool SSReader::
+get_delayed(Datagram &datagram) {
+  if (_delayed.empty()) {
+    return false;
+  }
+  const DelayedDatagram &dd = _delayed.front();
+  if (_delay_active) {
+    double now = ClockObject::get_global_clock()->get_frame_time();
+    if (dd._reveal_time > now) {
+      // Not yet.
       return false;
       return false;
     }
     }
-    _data_so_far += (char)ch;
   }
   }
 
 
-  dg.clear();
-  dg.append_data(_data_so_far);
+  datagram = dd._datagram;
+  _delayed.pop_front();
 
 
-  _data_expected = 0;
-  _data_so_far = string();
   return true;
   return true;
 }
 }
+#endif  // SIMULATE_NETWORK_DELAY
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//     Function: SocketStream::send_datagram
+//     Function: SSWriter::Destructor
+//       Access: Public, Virtual
+//  Description: 
+////////////////////////////////////////////////////////////////////
+SSWriter::
+~SSWriter() {
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: SSWriter::send_datagram
 //       Access: Public
 //       Access: Public
 //  Description: Transmits the indicated datagram over the socket by
 //  Description: Transmits the indicated datagram over the socket by
 //               prepending it with a little-endian 16-bit byte count.
 //               prepending it with a little-endian 16-bit byte count.
@@ -155,19 +192,21 @@ receive_datagram(Datagram &dg) {
 //               connection is closed, even if the socket stream is
 //               connection is closed, even if the socket stream is
 //               non-blocking.
 //               non-blocking.
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-bool SocketStream::
+bool SSWriter::
 send_datagram(const Datagram &dg) {
 send_datagram(const Datagram &dg) {
   Datagram header;
   Datagram header;
   header.add_uint16(dg.get_length());
   header.add_uint16(dg.get_length());
 
 
   // These two writes don't generate two socket calls, because the
   // These two writes don't generate two socket calls, because the
   // socket stream is always buffered.
   // socket stream is always buffered.
-  write((const char *)header.get_data(), header.get_length());
-  write((const char *)dg.get_data(), dg.get_length()); 
+  _ostream->write((const char *)header.get_data(), header.get_length());
+  _ostream->write((const char *)dg.get_data(), dg.get_length());
 
 
   // Now flush the buffer immediately, forcing the data to be sent
   // Now flush the buffer immediately, forcing the data to be sent
   // (unless collect-tcp mode is in effect).
   // (unless collect-tcp mode is in effect).
-  return consider_flush();
+  flush();
+
+  return !is_closed();
 }
 }
 
 
 #endif  // HAVE_SSL
 #endif  // HAVE_SSL

+ 83 - 38
panda/src/downloader/socketStream.h

@@ -22,6 +22,8 @@
 #include "pandabase.h"
 #include "pandabase.h"
 #include "clockObject.h"
 #include "clockObject.h"
 #include "config_express.h" // for collect_tcp
 #include "config_express.h" // for collect_tcp
+#include "datagram.h"
+#include "pdeque.h"
 
 
 // At the present, this module is not compiled if OpenSSL is not
 // At the present, this module is not compiled if OpenSSL is not
 // available, since the only current use for it is to implement
 // available, since the only current use for it is to implement
@@ -29,43 +31,65 @@
 
 
 #ifdef HAVE_SSL
 #ifdef HAVE_SSL
 
 
-class Datagram;
-
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//       Class : ISocketStream
-// Description : This is a base class for istreams implemented in
-//               Panda that read from a (possibly non-blocking)
-//               socket.  It adds is_closed(), which can be called
-//               after an eof condition to check whether the socket
-//               has been closed, or whether more data may be
-//               available later.
+//       Class : SSReader
+// Description : An internal class for reading from a socket stream.
+//               This serves as a base class for both ISocketStream
+//               and SocketStream; its purpose is to minimize
+//               redundant code between them.  Do not use it directly.
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-class EXPCL_PANDAEXPRESS ISocketStream : public istream {
+class EXPCL_PANDAEXPRESS SSReader {
 public:
 public:
-  INLINE ISocketStream(streambuf *buf);
+  INLINE SSReader(istream *stream);
+  virtual ~SSReader();
 
 
 PUBLISHED:
 PUBLISHED:
-  bool receive_datagram(Datagram &dg);
+  INLINE bool receive_datagram(Datagram &dg);
 
 
   virtual bool is_closed() = 0;
   virtual bool is_closed() = 0;
   virtual void close() = 0;
   virtual void close() = 0;
 
 
 private:
 private:
+  bool do_receive_datagram(Datagram &dg);
+
+  istream *_istream;
   size_t _data_expected;
   size_t _data_expected;
   string _data_so_far;
   string _data_so_far;
+
+#ifdef SIMULATE_NETWORK_DELAY
+PUBLISHED:
+  void start_delay(double min_delay, double max_delay);
+  void stop_delay();
+
+private:
+  void delay_datagram(const Datagram &datagram);
+  bool get_delayed(Datagram &datagram);
+
+  class DelayedDatagram {
+  public:
+    double _reveal_time;
+    Datagram _datagram;
+  };
+    
+  typedef pdeque<DelayedDatagram> Delayed;
+  Delayed _delayed;
+  bool _delay_active;
+  double _min_delay, _delay_variance;
+
+#endif  // SIMULATE_NETWORK_DELAY
 };
 };
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-//       Class : OSocketStream
-// Description : A base class for ostreams that write to a (possibly
-//               non-blocking) socket.  It adds is_closed(), which can
-//               be called after any write operation fails to check
-//               whether the socket has been closed, or whether more
-//               data may be sent later.
+//       Class : SSWriter
+// Description : An internal class for writing to a socket stream.
+//               This serves as a base class for both OSocketStream
+//               and SocketStream; its purpose is to minimize
+//               redundant code between them.  Do not use it directly.
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-class EXPCL_PANDAEXPRESS OSocketStream : public ostream {
+class EXPCL_PANDAEXPRESS SSWriter {
 public:
 public:
-  INLINE OSocketStream(streambuf *buf);
+  INLINE SSWriter(ostream *stream);
+  virtual ~SSWriter();
 
 
 PUBLISHED:
 PUBLISHED:
   bool send_datagram(const Datagram &dg);
   bool send_datagram(const Datagram &dg);
@@ -82,42 +106,63 @@ PUBLISHED:
   INLINE bool flush();
   INLINE bool flush();
 
 
 private:
 private:
+  ostream *_ostream;
   bool _collect_tcp;
   bool _collect_tcp;
   double _collect_tcp_interval;
   double _collect_tcp_interval;
   double _queued_data_start;
   double _queued_data_start;
 };
 };
 
 
+////////////////////////////////////////////////////////////////////
+//       Class : ISocketStream
+// Description : This is a base class for istreams implemented in
+//               Panda that read from a (possibly non-blocking)
+//               socket.  It adds is_closed(), which can be called
+//               after an eof condition to check whether the socket
+//               has been closed, or whether more data may be
+//               available later.
+////////////////////////////////////////////////////////////////////
+class EXPCL_PANDAEXPRESS ISocketStream : public istream, public SSReader {
+public:
+  INLINE ISocketStream(streambuf *buf);
+
+PUBLISHED:
+  virtual bool is_closed() = 0;
+  virtual void close() = 0;
+};
+
+////////////////////////////////////////////////////////////////////
+//       Class : OSocketStream
+// Description : A base class for ostreams that write to a (possibly
+//               non-blocking) socket.  It adds is_closed(), which can
+//               be called after any write operation fails to check
+//               whether the socket has been closed, or whether more
+//               data may be sent later.
+////////////////////////////////////////////////////////////////////
+class EXPCL_PANDAEXPRESS OSocketStream : public ostream, public SSWriter {
+public:
+  INLINE OSocketStream(streambuf *buf);
+
+PUBLISHED:
+  virtual bool is_closed() = 0;
+  virtual void close() = 0;
+
+  INLINE bool flush();
+};
+
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 //       Class : SocketStream
 //       Class : SocketStream
 // Description : A base class for iostreams that read and write to a
 // Description : A base class for iostreams that read and write to a
 //               (possibly non-blocking) socket.
 //               (possibly non-blocking) socket.
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
-class EXPCL_PANDAEXPRESS SocketStream : public iostream {
+class EXPCL_PANDAEXPRESS SocketStream : public iostream, public SSReader, public SSWriter {
 public:
 public:
   INLINE SocketStream(streambuf *buf);
   INLINE SocketStream(streambuf *buf);
 
 
 PUBLISHED:
 PUBLISHED:
-  bool receive_datagram(Datagram &dg);
-  bool send_datagram(const Datagram &dg);
-
   virtual bool is_closed() = 0;
   virtual bool is_closed() = 0;
   virtual void close() = 0;
   virtual void close() = 0;
 
 
-  INLINE void set_collect_tcp(bool collect_tcp);
-  INLINE bool get_collect_tcp() const;
-  INLINE void set_collect_tcp_interval(double interval);
-  INLINE double get_collect_tcp_interval() const;
-
-  INLINE bool consider_flush();
   INLINE bool flush();
   INLINE bool flush();
-
-private:
-  size_t _data_expected;
-  string _data_so_far;
-
-  bool _collect_tcp;
-  double _collect_tcp_interval;
-  double _queued_data_start;
 };
 };
 
 
 
 

+ 2 - 2
panda/src/net/queuedConnectionReader.cxx

@@ -192,7 +192,7 @@ void QueuedConnectionReader::
 get_delayed() {
 get_delayed() {
   if (_delay_active) {
   if (_delay_active) {
     PR_Lock(_dd_mutex);
     PR_Lock(_dd_mutex);
-    double now = ClockObject::get_global_clock()->get_real_time();
+    double now = ClockObject::get_global_clock()->get_frame_time();
     while (!_delayed.empty()) {
     while (!_delayed.empty()) {
       const DelayedDatagram &dd = _delayed.front();
       const DelayedDatagram &dd = _delayed.front();
       if (dd._reveal_time > now) {
       if (dd._reveal_time > now) {
@@ -233,7 +233,7 @@ delay_datagram(const NetDatagram &datagram) {
       }
       }
 
 
     } else {
     } else {
-      double now = ClockObject::get_global_clock()->get_real_time();
+      double now = ClockObject::get_global_clock()->get_frame_time();
       double reveal_time = now + _min_delay;
       double reveal_time = now + _min_delay;
       
       
       if (_delay_variance > 0.0) {
       if (_delay_variance > 0.0) {

+ 0 - 8
panda/src/net/queuedConnectionReader.h

@@ -30,14 +30,6 @@
 
 
 EXPORT_TEMPLATE_CLASS(EXPCL_PANDA, EXPTP_PANDA, QueuedReturn<NetDatagram>);
 EXPORT_TEMPLATE_CLASS(EXPCL_PANDA, EXPTP_PANDA, QueuedReturn<NetDatagram>);
 
 
-#ifndef NDEBUG
-// We define this variable if we're compiling code to implement a
-// simulated network latency, useful for debuggin networked programs.
-// Normally, since this is a debugging tool, we wouldn't compile this
-// feature in if we're building a program for public release.
-#define SIMULATE_NETWORK_DELAY
-#endif
-
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 //       Class : QueuedConnectionReader
 //       Class : QueuedConnectionReader
 // Description : This flavor of ConnectionReader will read from its
 // Description : This flavor of ConnectionReader will read from its