Browse Source

more threading improvements

David Rose 17 years ago
parent
commit
cce4f2195e

+ 2 - 3
direct/src/distributed/ClientRepositoryBase.py

@@ -662,12 +662,11 @@ class ClientRepositoryBase(ConnectionRepository):
 
     def sendHeartbeatTask(self, task):
         self.sendHeartbeat()
-        self.waitForNextHeartBeat()
-        return Task.done
+        return Task.again
 
     def waitForNextHeartBeat(self):
         taskMgr.doMethodLater(self.heartbeatInterval, self.sendHeartbeatTask,
-                              "heartBeat")
+                              "heartBeat", taskChain = 'net')
 
     def replaceMethod(self, oldMethod, newFunction):
         return 0

+ 1 - 1
direct/src/distributed/ConnectionRepository.py

@@ -518,7 +518,7 @@ class ConnectionRepository(
         self.accept(CConnectionRepository.getOverflowEventName(),
                     self.handleReaderOverflow)
         taskMgr.add(self.readerPollUntilEmpty, self.uniqueName("readerPollTask"),
-                    priority = self.taskPriority)
+                    priority = self.taskPriority, taskChain = 'net')
 
     def stopReaderPollTask(self):
         taskMgr.remove(self.uniqueName("readerPollTask"))

+ 13 - 4
direct/src/distributed/cConnectionRepository.I

@@ -142,7 +142,7 @@ get_qcr() {
 
 #ifdef WANT_NATIVE_NET
 ////////////////////////////////////////////////////////////////////
-//     Function: CConnectionRepository::get_bcd
+//     Function: CConnectionRepository::get_bdc
 //       Access: Published
 //  Description: Returns the Buffered_DatagramConnection object associated
 //               with the repository.
@@ -162,6 +162,7 @@ get_bdc() {
 ////////////////////////////////////////////////////////////////////
 INLINE void CConnectionRepository::
 get_datagram(Datagram &dg) {
+  ReMutexHolder holder(_lock);
   dg = _dg;
 }
 
@@ -176,6 +177,7 @@ get_datagram(Datagram &dg) {
 ////////////////////////////////////////////////////////////////////
 INLINE void CConnectionRepository::
 get_datagram_iterator(DatagramIterator &di) {
+  ReMutexHolder holder(_lock);
   di = _di;
 }
 
@@ -188,13 +190,15 @@ get_datagram_iterator(DatagramIterator &di) {
 ////////////////////////////////////////////////////////////////////
 INLINE CHANNEL_TYPE CConnectionRepository::
 get_msg_channel(int offset) const {
-   nassertr(offset < (int)_msg_channels.size(),0);
-   return _msg_channels[offset];
+  ReMutexHolder holder(_lock);
+  nassertr(offset < (int)_msg_channels.size(),0);
+  return _msg_channels[offset];
 }
 
 INLINE int CConnectionRepository::
 get_msg_channel_count() const {
-    return _msg_channels.size();
+  ReMutexHolder holder(_lock);
+  return _msg_channels.size();
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -206,6 +210,7 @@ get_msg_channel_count() const {
 ////////////////////////////////////////////////////////////////////
 INLINE CHANNEL_TYPE CConnectionRepository::
 get_msg_sender() const {
+  ReMutexHolder holder(_lock);
   return _msg_sender;
 }
 
@@ -229,6 +234,7 @@ get_msg_sender() const {
 ////////////////////////////////////////////////////////////////////
 INLINE unsigned int CConnectionRepository::
 get_msg_type() const {
+  ReMutexHolder holder(_lock);
   return _msg_type;
 }
 
@@ -251,6 +257,7 @@ get_overflow_event_name() {
 ////////////////////////////////////////////////////////////////////
 INLINE bool CConnectionRepository::
 is_bundling_messages() const {
+  ReMutexHolder holder(_lock);
   return _bundling_msgs > 0;
 }
 
@@ -261,6 +268,7 @@ is_bundling_messages() const {
 ////////////////////////////////////////////////////////////////////
 INLINE void CConnectionRepository::
 set_want_message_bundling(bool flag) {
+  ReMutexHolder holder(_lock);
   // don't allow enable/disable while bundling
   nassertv(_bundling_msgs == 0);
   _want_message_bundling = flag;
@@ -273,6 +281,7 @@ set_want_message_bundling(bool flag) {
 ////////////////////////////////////////////////////////////////////
 INLINE bool CConnectionRepository::
 get_want_message_bundling() const {
+  ReMutexHolder holder(_lock);
   return _want_message_bundling;
 }
 

+ 46 - 10
direct/src/distributed/cConnectionRepository.cxx

@@ -45,9 +45,10 @@ PStatCollector CConnectionRepository::_update_pcollector("App:Show code:readerPo
 ////////////////////////////////////////////////////////////////////
 CConnectionRepository::
 CConnectionRepository(bool has_owner_view) :
+  _lock("CConnectionRepository::_lock"),
 #ifdef HAVE_PYTHON
   _python_repository(NULL),
-    _python_ai_datagramiterator(NULL),
+  _python_ai_datagramiterator(NULL),
 #endif
 #ifdef HAVE_OPENSSL
   _http_conn(NULL),
@@ -107,6 +108,8 @@ CConnectionRepository::
 ////////////////////////////////////////////////////////////////////
 void CConnectionRepository::
 set_connection_http(HTTPChannel *channel) {
+  ReMutexHolder holder(_lock);
+
   disconnect();
   nassertv(channel->is_connection_ready());
   _http_conn = channel->get_connection();
@@ -128,6 +131,8 @@ set_connection_http(HTTPChannel *channel) {
 ////////////////////////////////////////////////////////////////////
 SocketStream *CConnectionRepository::
 get_stream() {
+  ReMutexHolder holder(_lock);
+
   return _http_conn;
 }
 #endif  // HAVE_OPENSSL
@@ -143,6 +148,8 @@ get_stream() {
 ////////////////////////////////////////////////////////////////////
 bool CConnectionRepository::
 try_connect_net(const URLSpec &url) {
+  ReMutexHolder holder(_lock);
+
   disconnect();
 
   _net_conn = 
@@ -168,6 +175,8 @@ try_connect_net(const URLSpec &url) {
 ////////////////////////////////////////////////////////////////////
 bool CConnectionRepository::
 connect_native(const URLSpec &url) {
+  ReMutexHolder holder(_lock);
+
   _native=true;
   Socket_Address addr;
   addr.set_host(url.get_server(),url.get_port());
@@ -198,6 +207,8 @@ connect_native(const URLSpec &url) {
 ////////////////////////////////////////////////////////////////////
 void CConnectionRepository::
 start_delay(double min_delay, double max_delay) {
+  ReMutexHolder holder(_lock);
+
   if (min_delay != 0.0 || max_delay != 0.0) {
 #ifdef HAVE_NET
     _qcr.start_delay(min_delay, max_delay);
@@ -223,6 +234,8 @@ start_delay(double min_delay, double max_delay) {
 ////////////////////////////////////////////////////////////////////
 void CConnectionRepository::
 stop_delay() {
+  ReMutexHolder holder(_lock);
+
 #ifdef HAVE_NET
   _qcr.stop_delay();
 #endif  // HAVE_NET
@@ -235,7 +248,8 @@ stop_delay() {
 #endif  // SIMULATE_NETWORK_DELAY
 
 ////////////////////////////////////////////////////////////////////
-//     Function: CConnectionRepository::check_datagram//       Access: Published
+//     Function: CConnectionRepository::check_datagram
+//       Access: Published
 //  Description: Returns true if a new datagram is available, false
 //               otherwise.  If the return value is true, the new
 //               datagram may be retrieved via get_datagram(), or
@@ -244,6 +258,8 @@ stop_delay() {
 ////////////////////////////////////////////////////////////////////
 bool CConnectionRepository::
 check_datagram() {
+  ReMutexHolder holder(_lock);
+
   if (_simulated_disconnect) {
     return false;
   }
@@ -331,6 +347,7 @@ check_datagram() {
 ////////////////////////////////////////////////////////////////////
 bool CConnectionRepository::
 is_connected() {
+  ReMutexHolder holder(_lock);
 
 #ifdef WANT_NATIVE_NET
   if(_native)
@@ -373,12 +390,14 @@ is_connected() {
 //     Function: CConnectionRepository::send_datagram
 //       Access: Published
 //  Description: Queues the indicated datagram for sending to the
-//               server.  It may not get send immediately if
+//               server.  It may not get sent immediately if
 //               collect_tcp is in effect; call flush() to guarantee
 //               it is sent now.
 ////////////////////////////////////////////////////////////////////
 bool CConnectionRepository::
 send_datagram(const Datagram &dg) {
+  ReMutexHolder holder(_lock);
+
   if (_simulated_disconnect) {
     distributed_cat.warning()
       << "Unable to send datagram during simulated disconnect.\n";
@@ -426,14 +445,17 @@ send_datagram(const Datagram &dg) {
 ////////////////////////////////////////////////////////////////////
 //     Function: CConnectionRepository::start_message_bundle
 //       Access: Published
-//  Description: Send a set of messages to the state server that will be processed
-//               atomically
-//               for instance you can do a combined setLocation/setPos and prevent
-//               race conditions where clients briefly get the setLocation but not
-//               the setPos, because the state server hasn't processed the setPos yet
+//  Description: Send a set of messages to the state server that will
+//               be processed atomically.  For instance, you can do a
+//               combined setLocation/setPos and prevent race
+//               conditions where clients briefly get the setLocation
+//               but not the setPos, because the state server hasn't
+//               processed the setPos yet
 ////////////////////////////////////////////////////////////////////
 void CConnectionRepository::
 start_message_bundle() {
+  ReMutexHolder holder(_lock);
+
   // store up network messages until sendMessageBundle is called
   // all updates in between must be sent from the same doId (updates
   // must all affect the same DistributedObject)
@@ -450,10 +472,12 @@ start_message_bundle() {
 ////////////////////////////////////////////////////////////////////
 //     Function: CConnectionRepository::send_message_bundle
 //       Access: Published
-//  Description: send network messages queued up since startMessageBundle was called
+//  Description: Send network messages queued up since
+//               startMessageBundle was called.
 ////////////////////////////////////////////////////////////////////
 void CConnectionRepository::
 send_message_bundle(unsigned int channel, unsigned int sender_channel) {
+  ReMutexHolder holder(_lock);
   nassertv(_bundling_msgs);
 
   --_bundling_msgs;
@@ -488,6 +512,8 @@ send_message_bundle(unsigned int channel, unsigned int sender_channel) {
 ////////////////////////////////////////////////////////////////////
 void CConnectionRepository::
 abandon_message_bundles() {
+  ReMutexHolder holder(_lock);
+
   nassertv(is_bundling_messages());
   _bundling_msgs = 0;
   _bundle_msgs.clear();
@@ -496,10 +522,12 @@ abandon_message_bundles() {
 ////////////////////////////////////////////////////////////////////
 //     Function: CConnectionRepository::bundle_msg
 //       Access: Published
-//  Description: send network messages queued up since startMessageBundle was called
+//  Description:
 ////////////////////////////////////////////////////////////////////
 void CConnectionRepository::
 bundle_msg(const Datagram &dg) {
+  ReMutexHolder holder(_lock);
+
   nassertv(is_bundling_messages());
   _bundle_msgs.push_back(dg.get_message());
 }
@@ -513,6 +541,8 @@ bundle_msg(const Datagram &dg) {
 ////////////////////////////////////////////////////////////////////
 bool CConnectionRepository::
 consider_flush() {
+  ReMutexHolder holder(_lock);
+
   if (_simulated_disconnect) {
     return false;
   }
@@ -546,6 +576,8 @@ consider_flush() {
 ////////////////////////////////////////////////////////////////////
 bool CConnectionRepository::
 flush() {
+  ReMutexHolder holder(_lock);
+
   if (_simulated_disconnect) {
     return false;
   }
@@ -576,6 +608,8 @@ flush() {
 ////////////////////////////////////////////////////////////////////
 void CConnectionRepository::
 disconnect() {
+  ReMutexHolder holder(_lock);
+
   #ifdef WANT_NATIVE_NET
   if(_native) {
     _bdc.Reset();
@@ -939,6 +973,7 @@ describe_message(ostream &out, const string &prefix,
 
 bool CConnectionRepository::network_based_reader_and_yielder(PyObject *PycallBackFunction,ClockObject &clock, float returnBy)
 {
+  ReMutexHolder holder(_lock);
     while(is_connected())
     {
         check_datagram_ai(PycallBackFunction);
@@ -956,6 +991,7 @@ bool CConnectionRepository::network_based_reader_and_yielder(PyObject *PycallBac
 
 bool CConnectionRepository::check_datagram_ai(PyObject *PycallBackFunction)
 {
+  ReMutexHolder holder(_lock);
     // these could be static .. not 
   PyObject *doId2do = NULL; 
 

+ 5 - 0
direct/src/distributed/cConnectionRepository.h

@@ -24,6 +24,8 @@
 #include "pStatCollector.h"
 #include "datagramIterator.h"
 #include "clockObject.h"
+#include "reMutex.h"
+#include "reMutexHolder.h"
 
 #ifdef HAVE_NET
 #include "queuedConnectionManager.h"
@@ -155,6 +157,9 @@ private:
   void describe_message(ostream &out, const string &prefix, 
                         const Datagram &dg) const;
 
+private:
+  ReMutex _lock;
+
 #ifdef HAVE_PYTHON
   PyObject *_python_repository;
   PyObject *_python_ai_datagramiterator;