Browse Source

add collect-tcp

David Rose 23 years ago
parent
commit
00b3e0ca78

+ 8 - 0
direct/src/distributed/ClientRepository.py

@@ -208,6 +208,14 @@ class ClientRepository(DirectObject.DirectObject):
         # to None; enforce that condition
         if not self.tcpConn:
             return 0
+
+        # Make sure any recently-sent datagrams are flushed when the
+        # time expires, if we're in collect-tcp mode.
+        # Temporary try .. except for old Pandas.
+        try:
+            self.tcpConn.considerFlush()
+        except:
+            pass
         
         if self.connectHttp:
             datagram = Datagram()

+ 5 - 2
panda/src/downloader/httpChannel.cxx

@@ -1755,14 +1755,17 @@ http_getline(string &str) {
 //       Access: Private
 //  Description: Sends a series of lines to the server.  Returns true
 //               if the buffer is fully sent, or false if some of it
-//               remains.
+//               remains.  If this returns false, the function must be
+//               called again later, passing in the exact same string,
+//               until the return value is true.
 ////////////////////////////////////////////////////////////////////
 bool HTTPChannel::
 http_send(const string &str) {
   nassertr(str.length() > _sent_so_far, true);
 
   // Use the underlying BIO to write to the server, instead of the
-  // BIOStream, which would insist on blocking.
+  // BIOStream, which would insist on blocking (and might furthermore
+  // delay the send due to collect-tcp mode being enabled).
   size_t bytes_to_send = str.length() - _sent_so_far;
   int write_count =
     BIO_write(*_bio, str.data() + _sent_so_far, bytes_to_send);

+ 7 - 2
panda/src/downloader/httpClient.cxx

@@ -319,8 +319,13 @@ clear_expected_servers() {
 //  Description: Returns a new HTTPChannel object that may be used
 //               for reading multiple documents using the same
 //               connection, for greater network efficiency than
-//               calling HTTPClient::get_document() repeatedly (and
-//               thus forcing a new connection for each document).
+//               calling HTTPClient::get_document() repeatedly (which
+//               would force a new connection for each document).
+//
+//               Also, HTTPChannel has some additional, less common
+//               interface methods than the basic interface methods
+//               that exist on HTTPClient; if you wish to call any of
+//               these methods you must first obtain an HTTPChannel.
 //
 //               Pass true for persistent_connection to gain this
 //               network efficiency.  If, on the other hand, your

+ 196 - 0
panda/src/downloader/socketStream.I

@@ -34,6 +34,104 @@ ISocketStream(streambuf *buf) : istream(buf) {
 ////////////////////////////////////////////////////////////////////
 INLINE OSocketStream::
 OSocketStream(streambuf *buf) : ostream(buf) {
+  _collect_tcp = collect_tcp;
+  _collect_tcp_interval = collect_tcp_interval;
+  _queued_data_start = 0.0;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: OSocketStream::set_collect_tcp
+//       Access: Published
+//  Description: Enables or disables "collect-tcp" mode.  In this
+//               mode, individual TCP packets are not sent
+//               immediately, but rather they are collected together
+//               and accumulated to be sent periodically as one larger
+//               TCP packet.  This cuts down on overhead from the
+//               TCP/IP protocol, especially if many small packets
+//               need to be sent on the same connection, but it
+//               introduces additional latency (since packets must be
+//               held before they can be sent).
+//
+//               See set_collect_tcp_interval() to specify the
+//               interval of time for which to hold packets before
+//               sending them.
+//
+//               If you enable this mode, you may also need to
+//               periodically call consider_flush() to flush the queue
+//               if no packets have been sent recently.
+////////////////////////////////////////////////////////////////////
+INLINE void OSocketStream::
+set_collect_tcp(bool collect_tcp) {
+  _collect_tcp = collect_tcp;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: OSocketStream::get_collect_tcp
+//       Access: Published
+//  Description: Returns the current setting of "collect-tcp" mode.
+//               See set_collect_tcp().
+////////////////////////////////////////////////////////////////////
+INLINE bool OSocketStream::
+get_collect_tcp() const {
+  return _collect_tcp;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: OSocketStream::set_collect_tcp_interval
+//       Access: Published
+//  Description: Specifies the interval in time, in seconds, for which
+//               to hold TCP packets before sending all of the
+//               recently received packets at once.  This only has
+//               meaning if "collect-tcp" mode is enabled; see
+//               set_collect_tcp().
+////////////////////////////////////////////////////////////////////
+INLINE void OSocketStream::
+set_collect_tcp_interval(double interval) {
+  _collect_tcp_interval = interval;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: OSocketStream::get_collect_tcp_interval
+//       Access: Published
+//  Description: Returns the interval in time, in seconds, for which
+//               to hold TCP packets before sending all of the
+//               recently received packets at once.  This only has
+//               meaning if "collect-tcp" mode is enabled; see
+//               set_collect_tcp().
+////////////////////////////////////////////////////////////////////
+INLINE double OSocketStream::
+get_collect_tcp_interval() const {
+  return _collect_tcp_interval;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: OSocketStream::consider_flush
+//       Access: Published
+//  Description: Sends the most recently queued data if enough time
+//               has elapsed.  This only has meaning if
+//               set_collect_tcp() has been set to true.
+////////////////////////////////////////////////////////////////////
+INLINE bool OSocketStream::
+consider_flush() {
+  if (!_collect_tcp || 
+      ClockObject::get_global_clock()->get_real_time() - _queued_data_start >= _collect_tcp_interval) {
+    return flush();
+  }
+  return true;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: OSocketStream::flush
+//       Access: Published
+//  Description: Sends the most recently queued data now.  This only
+//               has meaning if set_collect_tcp() has been set to
+//               true.
+////////////////////////////////////////////////////////////////////
+bool OSocketStream::
+flush() {
+  ostream::flush();
+  _queued_data_start = ClockObject::get_global_clock()->get_real_time();
+  return !is_closed();
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -44,4 +142,102 @@ OSocketStream(streambuf *buf) : ostream(buf) {
 INLINE SocketStream::
 SocketStream(streambuf *buf) : iostream(buf) {
   _data_expected = 0;
+  _collect_tcp = collect_tcp;
+  _collect_tcp_interval = collect_tcp_interval;
+  _queued_data_start = 0.0;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: SocketStream::set_collect_tcp
+//       Access: Published
+//  Description: Enables or disables "collect-tcp" mode.  In this
+//               mode, individual TCP packets are not sent
+//               immediately, but rather they are collected together
+//               and accumulated to be sent periodically as one larger
+//               TCP packet.  This cuts down on overhead from the
+//               TCP/IP protocol, especially if many small packets
+//               need to be sent on the same connection, but it
+//               introduces additional latency (since packets must be
+//               held before they can be sent).
+//
+//               See set_collect_tcp_interval() to specify the
+//               interval of time for which to hold packets before
+//               sending them.
+//
+//               If you enable this mode, you may also need to
+//               periodically call consider_flush() to flush the queue
+//               if no packets have been sent recently.
+////////////////////////////////////////////////////////////////////
+INLINE void SocketStream::
+set_collect_tcp(bool collect_tcp) {
+  _collect_tcp = collect_tcp;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: SocketStream::get_collect_tcp
+//       Access: Published
+//  Description: Returns the current setting of "collect-tcp" mode.
+//               See set_collect_tcp().
+////////////////////////////////////////////////////////////////////
+INLINE bool SocketStream::
+get_collect_tcp() const {
+  return _collect_tcp;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: SocketStream::set_collect_tcp_interval
+//       Access: Published
+//  Description: Specifies the interval in time, in seconds, for which
+//               to hold TCP packets before sending all of the
+//               recently received packets at once.  This only has
+//               meaning if "collect-tcp" mode is enabled; see
+//               set_collect_tcp().
+////////////////////////////////////////////////////////////////////
+INLINE void SocketStream::
+set_collect_tcp_interval(double interval) {
+  _collect_tcp_interval = interval;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: SocketStream::get_collect_tcp_interval
+//       Access: Published
+//  Description: Returns the interval in time, in seconds, for which
+//               to hold TCP packets before sending all of the
+//               recently received packets at once.  This only has
+//               meaning if "collect-tcp" mode is enabled; see
+//               set_collect_tcp().
+////////////////////////////////////////////////////////////////////
+INLINE double SocketStream::
+get_collect_tcp_interval() const {
+  return _collect_tcp_interval;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: SocketStream::consider_flush
+//       Access: Published
+//  Description: Sends the most recently queued data if enough time
+//               has elapsed.  This only has meaning if
+//               set_collect_tcp() has been set to true.
+////////////////////////////////////////////////////////////////////
+INLINE bool SocketStream::
+consider_flush() {
+  if (!_collect_tcp || 
+      ClockObject::get_global_clock()->get_real_time() - _queued_data_start >= _collect_tcp_interval) {
+    return flush();
+  }
+  return true;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: SocketStream::flush
+//       Access: Published
+//  Description: Sends the most recently queued data now.  This only
+//               has meaning if set_collect_tcp() has been set to
+//               true.
+////////////////////////////////////////////////////////////////////
+bool SocketStream::
+flush() {
+  iostream::flush();
+  _queued_data_start = ClockObject::get_global_clock()->get_real_time();
+  return !is_closed();
 }

+ 6 - 2
panda/src/downloader/socketStream.cxx

@@ -159,11 +159,15 @@ bool SocketStream::
 send_datagram(const Datagram &dg) {
   Datagram header;
   header.add_uint16(dg.get_length());
+
+  // These two writes don't generate two socket calls, because the
+  // socket stream is always buffered.
   write((const char *)header.get_data(), header.get_length());
   write((const char *)dg.get_data(), dg.get_length()); 
-  flush();
 
-  return !is_closed();
+  // Now flush the buffer immediately, forcing the data to be sent
+  // (unless collect-tcp mode is in effect).
+  return consider_flush();
 }
 
 #endif  // HAVE_SSL

+ 27 - 0
panda/src/downloader/socketStream.h

@@ -20,6 +20,8 @@
 #define SOCKETSTREAM_H
 
 #include "pandabase.h"
+#include "clockObject.h"
+#include "config_express.h" // for collect_tcp
 
 // At the present, this module is not compiled if OpenSSL is not
 // available, since the only current use for it is to implement
@@ -68,6 +70,19 @@ PUBLISHED:
   bool send_datagram(const Datagram &dg);
 
   virtual bool is_closed() = 0;
+
+  INLINE void set_collect_tcp(bool collect_tcp);
+  INLINE bool get_collect_tcp() const;
+  INLINE void set_collect_tcp_interval(double interval);
+  INLINE double get_collect_tcp_interval() const;
+
+  INLINE bool consider_flush();
+  INLINE bool flush();
+
+private:
+  bool _collect_tcp;
+  double _collect_tcp_interval;
+  double _queued_data_start;
 };
 
 ////////////////////////////////////////////////////////////////////
@@ -85,9 +100,21 @@ PUBLISHED:
 
   virtual bool is_closed() = 0;
 
+  INLINE void set_collect_tcp(bool collect_tcp);
+  INLINE bool get_collect_tcp() const;
+  INLINE void set_collect_tcp_interval(double interval);
+  INLINE double get_collect_tcp_interval() const;
+
+  INLINE bool consider_flush();
+  INLINE bool flush();
+
 private:
   size_t _data_expected;
   string _data_so_far;
+
+  bool _collect_tcp;
+  double _collect_tcp_interval;
+  double _queued_data_start;
 };
 
 

+ 6 - 0
panda/src/express/config_express.cxx

@@ -194,6 +194,12 @@ config_express.GetBool("keep-temporary-files", false);
 // this false, except for testing or if you mistrust the new code.
 const bool use_vfs = config_express.GetBool("use-vfs", true);
 
+// Set this true to enable accumulation of several small consecutive
+// TCP datagrams into one large datagram before sending it, to reduce
+// overhead from the TCP/IP protocol.  See
+// Connection::set_collect_tcp() or SocketStream::set_collect_tcp().
+const bool collect_tcp = config_express.GetBool("collect-tcp", false);
+const double collect_tcp_interval = config_express.GetDouble("collect-tcp-interval", 0.2);
 
 // Returns the configure object for accessing config variables from a
 // scripting language.

+ 3 - 0
panda/src/express/config_express.h

@@ -50,6 +50,9 @@ extern const bool keep_temporary_files;
 
 extern EXPCL_PANDAEXPRESS const bool use_vfs;
 
+extern EXPCL_PANDAEXPRESS const bool collect_tcp;
+extern EXPCL_PANDAEXPRESS const double collect_tcp_interval;
+
 // Expose the Config variable for Python access.
 BEGIN_PUBLISH
 typedef Config::Config<ConfigureGetConfig_config_express> ConfigExpress;

+ 223 - 72
panda/src/net/connection.cxx

@@ -23,12 +23,13 @@
 #include "datagramUDPHeader.h"
 #include "pprerror.h"
 #include "config_net.h"
+#include "config_express.h" // for collect_tcp
+#include "clockObject.h"
 
-#include <prerror.h>
 
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::Constructor
-//       Access: Public
+//       Access: Published
 //  Description: Creates a connection.  Normally this constructor
 //               should not be used directly by user code; use one of
 //               the methods in ConnectionManager to make a new
@@ -40,11 +41,15 @@ Connection(ConnectionManager *manager, PRFileDesc *socket) :
   _socket(socket)
 {
   _write_mutex = PR_NewLock();
+  _collect_tcp = collect_tcp;
+  _collect_tcp_interval = collect_tcp_interval;
+  _queued_data_start = 0.0;
+  _queued_count = 0;
 }
 
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::Destructor
-//       Access: Public
+//       Access: Published
 //  Description: Closes a connection.
 ////////////////////////////////////////////////////////////////////
 Connection::
@@ -53,6 +58,8 @@ Connection::
     << "Deleting connection " << (void *)this << "\n";
 
   if (_socket != (PRFileDesc *)NULL) {
+    flush();
+
     PRStatus result = PR_Close(_socket);
     if (result != PR_SUCCESS) {
       pprerror("PR_Close");
@@ -64,7 +71,7 @@ Connection::
 
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::get_address
-//       Access: Public
+//       Access: Published
 //  Description: Returns the address bound to this connection, if it
 //               is a TCP connection.
 ////////////////////////////////////////////////////////////////////
@@ -80,7 +87,7 @@ get_address() const {
 
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::get_manager
-//       Access: Public
+//       Access: Published
 //  Description: Returns a pointer to the ConnectionManager object
 //               that serves this connection.
 ////////////////////////////////////////////////////////////////////
@@ -91,7 +98,7 @@ get_manager() const {
 
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::get_socket
-//       Access: Public
+//       Access: Published
 //  Description: Returns the internal NSPR pointer that defines the
 //               connection.
 ////////////////////////////////////////////////////////////////////
@@ -100,9 +107,107 @@ get_socket() const {
   return _socket;
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: Connection::set_collect_tcp
+//       Access: Published
+//  Description: Enables or disables "collect-tcp" mode.  In this
+//               mode, individual TCP packets are not sent
+//               immediately, but rather they are collected together
+//               and accumulated to be sent periodically as one larger
+//               TCP packet.  This cuts down on overhead from the
+//               TCP/IP protocol, especially if many small packets
+//               need to be sent on the same connection, but it
+//               introduces additional latency (since packets must be
+//               held before they can be sent).
+//
+//               See set_collect_tcp_interval() to specify the
+//               interval of time for which to hold packets before
+//               sending them.
+//
+//               If you enable this mode, you may also need to
+//               periodically call consider_flush() to flush the queue
+//               if no packets have been sent recently.
+////////////////////////////////////////////////////////////////////
+void Connection::
+set_collect_tcp(bool collect_tcp) {
+  _collect_tcp = collect_tcp;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: Connection::get_collect_tcp
+//       Access: Published
+//  Description: Returns the current setting of "collect-tcp" mode.
+//               See set_collect_tcp().
+////////////////////////////////////////////////////////////////////
+bool Connection::
+get_collect_tcp() const {
+  return _collect_tcp;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: Connection::set_collect_tcp_interval
+//       Access: Published
+//  Description: Specifies the interval in time, in seconds, for which
+//               to hold TCP packets before sending all of the
+//               recently received packets at once.  This only has
+//               meaning if "collect-tcp" mode is enabled; see
+//               set_collect_tcp().
+////////////////////////////////////////////////////////////////////
+void Connection::
+set_collect_tcp_interval(double interval) {
+  _collect_tcp_interval = interval;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: Connection::get_collect_tcp_interval
+//       Access: Published
+//  Description: Returns the interval in time, in seconds, for which
+//               to hold TCP packets before sending all of the
+//               recently received packets at once.  This only has
+//               meaning if "collect-tcp" mode is enabled; see
+//               set_collect_tcp().
+////////////////////////////////////////////////////////////////////
+double Connection::
+get_collect_tcp_interval() const {
+  return _collect_tcp_interval;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: Connection::consider_flush
+//       Access: Published
+//  Description: Sends the most recently queued TCP datagram(s) if
+//               enough time has elapsed.  This only has meaning if
+//               set_collect_tcp() has been set to true.
+////////////////////////////////////////////////////////////////////
+bool Connection::
+consider_flush() {
+  PR_Lock(_write_mutex);
+
+  if (!_collect_tcp || 
+      ClockObject::get_global_clock()->get_real_time() - _queued_data_start >= _collect_tcp_interval) {
+    return do_flush();
+  }
+
+  PR_Unlock(_write_mutex);
+  return true;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: Connection::flush
+//       Access: Published
+//  Description: Sends the most recently queued TCP datagram(s) now.
+//               This only has meaning if set_collect_tcp() has been
+//               set to true.
+////////////////////////////////////////////////////////////////////
+bool Connection::
+flush() {
+  PR_Lock(_write_mutex);
+  return do_flush();
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::set_nonblock
-//       Access: Public
+//       Access: Published
 //  Description: Sets whether nonblocking I/O should be in effect.
 ////////////////////////////////////////////////////////////////////
 void Connection::
@@ -115,7 +220,7 @@ set_nonblock(bool flag) {
 
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::set_linger
-//       Access: Public
+//       Access: Published
 //  Description: Sets the time to linger on close if data is present.
 //               If flag is false, when you close a socket with data
 //               available the system attempts to deliver the data to
@@ -136,7 +241,7 @@ set_linger(bool flag, double time) {
 
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::set_reuse_addr
-//       Access: Public
+//       Access: Published
 //  Description: Sets whether local address reuse is allowed.
 ////////////////////////////////////////////////////////////////////
 void Connection::
@@ -149,7 +254,7 @@ set_reuse_addr(bool flag) {
 
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::set_keep_alive
-//       Access: Public
+//       Access: Published
 //  Description: Sets whether the connection is periodically tested to
 //               see if it is still alive.
 ////////////////////////////////////////////////////////////////////
@@ -163,7 +268,7 @@ set_keep_alive(bool flag) {
 
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::set_recv_buffer_size
-//       Access: Public
+//       Access: Published
 //  Description: Sets the size of the receive buffer, in bytes.
 ////////////////////////////////////////////////////////////////////
 void Connection::
@@ -176,7 +281,7 @@ set_recv_buffer_size(int size) {
 
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::set_send_buffer_size
-//       Access: Public
+//       Access: Published
 //  Description: Sets the size of the send buffer, in bytes.
 ////////////////////////////////////////////////////////////////////
 void Connection::
@@ -189,7 +294,7 @@ set_send_buffer_size(int size) {
 
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::set_ip_time_to_live
-//       Access: Public
+//       Access: Published
 //  Description: Sets IP time-to-live.
 ////////////////////////////////////////////////////////////////////
 void Connection::
@@ -202,7 +307,7 @@ set_ip_time_to_live(int ttl) {
 
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::set_ip_type_of_service
-//       Access: Public
+//       Access: Published
 //  Description: Sets IP type-of-service and precedence.
 ////////////////////////////////////////////////////////////////////
 void Connection::
@@ -215,7 +320,7 @@ set_ip_type_of_service(int tos) {
 
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::set_no_delay
-//       Access: Public
+//       Access: Published
 //  Description: If flag is true, this disables the Nagle algorithm,
 //               and prevents delaying of send to coalesce packets.
 ////////////////////////////////////////////////////////////////////
@@ -229,7 +334,7 @@ set_no_delay(bool flag) {
 
 ////////////////////////////////////////////////////////////////////
 //     Function: Connection::set_max_segment
-//       Access: Public
+//       Access: Published
 //  Description: Sets the maximum segment size.
 ////////////////////////////////////////////////////////////////////
 void Connection::
@@ -253,65 +358,49 @@ bool Connection::
 send_datagram(const NetDatagram &datagram) {
   nassertr(_socket != (PRFileDesc *)NULL, false);
 
-  PR_Lock(_write_mutex);
-
-  PRInt32 bytes_sent;
-  PRInt32 result;
   if (PR_GetDescType(_socket) == PR_DESC_SOCKET_UDP) {
+    // We have to send UDP right away.
+    PR_Lock(_write_mutex);
     DatagramUDPHeader header(datagram);
-    string data = header.get_header() + datagram.get_message();
-    bytes_sent = data.length();
+    string data;
+    data += header.get_header();
+    data += datagram.get_message();
+
+    PRInt32 bytes_to_send = data.length();
+    PRInt32 result;
     result = PR_SendTo(_socket,
-                       data.data(), bytes_sent,
+                       data.data(), bytes_to_send,
                        0,
                        datagram.get_address().get_addr(),
                        PR_INTERVAL_NO_TIMEOUT);
+    PRErrorCode errcode = PR_GetError();
 
     if (net_cat.is_debug()) {
       header.verify_datagram(datagram);
     }
-  } else {
-    DatagramTCPHeader header(datagram);
-    string data = header.get_header() + datagram.get_message();
-    bytes_sent = data.length();
-    result = PR_Send(_socket,
-                     data.data(), bytes_sent,
-                     0,
-                     PR_INTERVAL_NO_TIMEOUT);
 
-    if (net_cat.is_debug()) {
-      header.verify_datagram(datagram);
-    }
+    PR_Unlock(_write_mutex);
+    return check_send_error(result, errcode, bytes_to_send);
   }
 
-  PRErrorCode errcode = PR_GetError();
-
-  PR_Unlock(_write_mutex);
-
-  if (result < 0) {
-    if (errcode == PR_CONNECT_RESET_ERROR
-#ifdef PR_SOCKET_SHUTDOWN_ERROR
-        || errcode == PR_SOCKET_SHUTDOWN_ERROR
-        || errcode == PR_CONNECT_ABORTED_ERROR
-#endif
-        ) {
-      // The connection has been reset; tell our manager about it
-      // and ignore it.
-      if (_manager != (ConnectionManager *)NULL) {
-        _manager->connection_reset(this);
-      }
-
-    } else if (errcode != PR_PENDING_INTERRUPT_ERROR) {
-      pprerror("PR_SendTo");
-    }
+  // We might queue up TCP packets for later sending.
+  DatagramTCPHeader header(datagram);
 
-    return false;
+  PR_Lock(_write_mutex);
+  _queued_data += header.get_header();
+  _queued_data += datagram.get_message();
+  _queued_count++;
+  
+  if (net_cat.is_debug()) {
+    header.verify_datagram(datagram);
+  }
 
-  } else if (result != bytes_sent) {
-    net_cat.error() << "Not enough bytes sent to socket.\n";
-    return false;
+  if (!_collect_tcp || 
+      ClockObject::get_global_clock()->get_real_time() - _queued_data_start >= _collect_tcp_interval) {
+    return do_flush();
   }
 
+  PR_Unlock(_write_mutex);
   return true;
 }
 
@@ -326,31 +415,93 @@ bool Connection::
 send_raw_datagram(const NetDatagram &datagram) {
   nassertr(_socket != (PRFileDesc *)NULL, false);
 
-  PR_Lock(_write_mutex);
-
-  PRInt32 bytes_sent;
-  PRInt32 result;
   if (PR_GetDescType(_socket) == PR_DESC_SOCKET_UDP) {
+    // We have to send UDP right away.
+
     string data = datagram.get_message();
-    bytes_sent = data.length();
+    PRInt32 bytes_to_send = data.length();
+
+    if (net_cat.is_spam()) {
+      net_cat.spam()
+        << "Sending UDP datagram with " 
+        << bytes_to_send << " bytes to " << (void *)this << "\n";
+    }
+
+    PR_Lock(_write_mutex);
+    PRInt32 result;
     result = PR_SendTo(_socket,
-                       data.data(), bytes_sent,
+                       data.data(), bytes_to_send,
                        0,
                        datagram.get_address().get_addr(),
                        PR_INTERVAL_NO_TIMEOUT);
-  } else {
-    string data = datagram.get_message();
-    bytes_sent = data.length();
-    result = PR_Send(_socket,
-                     data.data(), bytes_sent,
-                     0,
-                     PR_INTERVAL_NO_TIMEOUT);
+    PRErrorCode errcode = PR_GetError();
+
+    PR_Unlock(_write_mutex);
+    return check_send_error(result, errcode, bytes_to_send);
+  }
+
+  // We might queue up TCP packets for later sending.
+
+  PR_Lock(_write_mutex);
+  _queued_data += datagram.get_message();
+  _queued_count++;
+
+  if (!_collect_tcp || 
+      ClockObject::get_global_clock()->get_real_time() - _queued_data_start >= _collect_tcp_interval) {
+    return do_flush();
+  }
+
+  PR_Unlock(_write_mutex);
+  return true;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: Connection::do_flush
+//       Access: Private
+//  Description: The private implementation of flush(), this assumes
+//               the _write_mutex has already been locked on entry.
+//               It will be unlocked on return.
+////////////////////////////////////////////////////////////////////
+bool Connection::
+do_flush() {
+  PRInt32 bytes_to_send = _queued_data.length();
+  if (bytes_to_send == 0) {
+    _queued_count = 0;
+    _queued_data_start = ClockObject::get_global_clock()->get_real_time();
+    PR_Unlock(_write_mutex);
+    return true;
   }
 
+  if (net_cat.is_spam()) {
+    net_cat.spam()
+      << "Sending " << _queued_count << " TCP datagram(s) with " 
+      << bytes_to_send << " total bytes to " << (void *)this << "\n";
+  }
+
+  PRInt32 result;
+  result = PR_Send(_socket,
+                   _queued_data.data(), bytes_to_send,
+                   0,
+                   PR_INTERVAL_NO_TIMEOUT);
   PRErrorCode errcode = PR_GetError();
 
+  _queued_data = string();
+  _queued_count = 0;
+  _queued_data_start = ClockObject::get_global_clock()->get_real_time();
+
   PR_Unlock(_write_mutex);
 
+  return check_send_error(result, errcode, bytes_to_send);
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: Connection::check_send_error
+//       Access: Private
+//  Description: Checks the return value of a PR_Send() or PR_SendTo()
+//               call.
+////////////////////////////////////////////////////////////////////
+bool Connection::
+check_send_error(PRInt32 result, PRErrorCode errcode, PRInt32 bytes_to_send) {
   if (result < 0) {
     if (errcode == PR_CONNECT_RESET_ERROR
 #ifdef PR_SOCKET_SHUTDOWN_ERROR
@@ -370,7 +521,7 @@ send_raw_datagram(const NetDatagram &datagram) {
 
     return false;
 
-  } else if (result != bytes_sent) {
+  } else if (result != bytes_to_send) {
     net_cat.error() << "Not enough bytes sent to socket.\n";
     return false;
   }

+ 20 - 5
panda/src/net/connection.h

@@ -19,14 +19,13 @@
 #ifndef CONNECTION_H
 #define CONNECTION_H
 
-#include <pandabase.h>
-
-#include <referenceCount.h>
-
+#include "pandabase.h"
+#include "referenceCount.h"
 #include "netAddress.h"
 
 #include <prio.h>
 #include <prlock.h>
+#include <prerror.h>
 
 class ConnectionManager;
 class NetDatagram;
@@ -46,6 +45,14 @@ PUBLISHED:
 
   PRFileDesc *get_socket() const;
 
+  void set_collect_tcp(bool collect_tcp);
+  bool get_collect_tcp() const;
+  void set_collect_tcp_interval(double interval);
+  double get_collect_tcp_interval() const;
+
+  bool consider_flush();
+  bool flush();
+
   // Socket options.
   void set_nonblock(bool flag);
   void set_linger(bool flag, double time);
@@ -61,12 +68,20 @@ PUBLISHED:
 private:
   bool send_datagram(const NetDatagram &datagram);
   bool send_raw_datagram(const NetDatagram &datagram);
+  bool do_flush();
+  bool check_send_error(PRInt32 result, PRErrorCode errcode, PRInt32 bytes_to_send);
 
   ConnectionManager *_manager;
   PRFileDesc *_socket;
   PRLock *_write_mutex;
 
-friend class ConnectionWriter;
+  bool _collect_tcp;
+  double _collect_tcp_interval;
+  double _queued_data_start;
+  string _queued_data;
+  int _queued_count;
+
+  friend class ConnectionWriter;
 };
 
 #endif

+ 0 - 12
panda/src/net/connectionWriter.cxx

@@ -109,12 +109,6 @@ send(const Datagram &datagram, const PT(Connection) &connection) {
   nassertr(connection != (Connection *)NULL, false);
   nassertr(PR_GetDescType(connection->get_socket()) == PR_DESC_SOCKET_TCP, false);
 
-  if (net_cat.is_debug()) {
-    net_cat.debug()
-      << "Sending TCP datagram of " << datagram.get_length()
-      << " bytes\n";
-  }
-
   NetDatagram copy(datagram);
   copy.set_connection(connection);
 
@@ -151,12 +145,6 @@ send(const Datagram &datagram, const PT(Connection) &connection,
   nassertr(connection != (Connection *)NULL, false);
   nassertr(PR_GetDescType(connection->get_socket()) == PR_DESC_SOCKET_UDP, false);
 
-  if (net_cat.is_debug()) {
-    net_cat.debug()
-      << "Sending UDP datagram of " << datagram.get_length()
-      << " bytes\n";
-  }
-
   if (PR_GetDescType(connection->get_socket()) == PR_DESC_SOCKET_UDP &&
       (int)datagram.get_length() > maximum_udp_datagram) {
     net_cat.warning()

+ 4 - 2
panda/src/net/queuedConnectionReader.cxx

@@ -116,11 +116,13 @@ get_data(Datagram &result) {
 ////////////////////////////////////////////////////////////////////
 void QueuedConnectionReader::
 receive_datagram(const NetDatagram &datagram) {
-  if (net_cat.is_debug()) {
-    net_cat.debug()
+  /*
+  if (net_cat.is_spam()) {
+    net_cat.spam()
       << "Received datagram of " << datagram.get_length()
       << " bytes\n";
   }
+  */
 
 #ifdef SIMULATE_NETWORK_DELAY
   delay_datagram(datagram);