Browse Source

added message bundling support to guarantee atomic message processing on the state server

Darren Ranalli 18 years ago
parent
commit
113790f2da

+ 1 - 1
direct/src/dcparser/dcmsgtypes.h

@@ -32,7 +32,7 @@
 #define STATESERVER_OBJECT_UPDATE_FIELD                   2004
 #define STATESERVER_OBJECT_CREATE_WITH_REQUIRED_CONTEXT    2050
 #define STATESERVER_OBJECT_CREATE_WITH_REQUIR_OTHER_CONTEXT  2051
-
+#define STATESERVER_BOUNCE_MESSAGE                           2086
 
 #endif
 

+ 5 - 0
direct/src/distributed/DistributedObjectAI.py

@@ -267,6 +267,11 @@ class DistributedObjectAI(DistributedObjectBase, EnforcesCalldowns):
     def sendSetZone(self, zoneId):
         self.air.sendSetZone(self, zoneId)
 
+    def startMessageBundle(self, name):
+        self.air.startMessageBundle(name)
+    def sendMessageBundle(self):
+        self.air.sendMessageBundle(self.doId)
+
     def getZoneChangeEvent(self):
         # this event is generated whenever this object changes zones.
         # arguments are newZoneId, oldZoneId

+ 75 - 1
direct/src/distributed/cConnectionRepository.cxx

@@ -71,7 +71,8 @@ CConnectionRepository(bool has_owner_view) :
   _msg_sender(0),
   _msg_type(0),
   _has_owner_view(has_owner_view),
-  _handle_c_updates(true)
+  _handle_c_updates(true),
+  _bundling_msgs(false)
 {
 #if defined(HAVE_NET) && defined(SIMULATE_NETWORK_DELAY)
   if (min_lag != 0.0 || max_lag != 0.0) {
@@ -391,6 +392,11 @@ send_datagram(const Datagram &dg) {
     describe_message(nout, "SEND", dg);
   }
 
+  if (_bundling_msgs) {
+    bundle_msg(dg);
+    return false;
+  }
+
 #ifdef WANT_NATIVE_NET
   if(_native)
     return _bdc.SendMessage(dg);
@@ -420,6 +426,74 @@ send_datagram(const Datagram &dg) {
   return false;
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: CConnectionRepository::start_message_bundle
+//       Access: Published
+//  Description: Send a set of messages to the state server that will be processed
+//               atomically
+//               for instance you can do a combined setLocation/setPos and prevent
+//               race conditions where clients briefly get the setLocation but not
+//               the setPos, because the state server hasn't processed the setPos yet
+////////////////////////////////////////////////////////////////////
+void CConnectionRepository::
+start_message_bundle() {
+  // store up network messages until sendMessageBundle is called
+  // all updates in between must be sent from the same doId (updates
+  // must all affect the same DistributedObject)
+  // it is an error to call this again before calling sendMessageBundle
+  // NOTE: this is currently only implemented for setLocation and sendUpdate
+  // msgs
+  nassertv(!_bundling_msgs);
+  _bundling_msgs = true;
+  _bundle_msgs.clear();
+  if (get_verbose()) {
+    nout << "CR::SEND:BUNDLE_START" << endl;
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: CConnectionRepository::send_message_bundle
+//       Access: Published
+//  Description: send network messages queued up since startMessageBundle was called
+////////////////////////////////////////////////////////////////////
+void CConnectionRepository::
+send_message_bundle(unsigned int channel, unsigned int sender_channel) {
+  nassertv(_bundling_msgs);
+
+  Datagram dg;
+  // add server header (see PyDatagram.addServerHeader)
+  dg.add_int8(1);
+  dg.add_uint64(channel);
+  dg.add_uint64(sender_channel);
+  dg.add_uint16(STATESERVER_BOUNCE_MESSAGE);
+  // add each bundled message
+  BundledMsgVector::const_iterator bmi;
+  for (bmi = _bundle_msgs.begin(); bmi != _bundle_msgs.end(); bmi++) {
+    dg.add_string(*bmi);
+  }
+
+  _bundling_msgs = false;
+  _bundle_msgs.clear();
+
+  if (get_verbose()) {
+    nout << "CR::SEND:BUNDLE_FINISH" << endl;
+  }
+
+  // once _bundling_msgs flag is set to false, we can send the datagram
+  send_datagram(dg);
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: CConnectionRepository::bundle_msg
+//       Access: Published
+//  Description: send network messages queued up since startMessageBundle was called
+////////////////////////////////////////////////////////////////////
+void CConnectionRepository::
+bundle_msg(const Datagram &dg) {
+  nassertv(_bundling_msgs);
+  _bundle_msgs.push_back(dg.get_message());
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: CConnectionRepository::consider_flush
 //       Access: Published

+ 8 - 0
direct/src/distributed/cConnectionRepository.h

@@ -123,6 +123,10 @@ PUBLISHED:
 
   bool send_datagram(const Datagram &dg);
 
+  void start_message_bundle();
+  void send_message_bundle(unsigned int channel, unsigned int sender_channel);
+  void bundle_msg(const Datagram &dg);
+
   bool consider_flush();
   bool flush();
 
@@ -187,6 +191,10 @@ private:
 
   static const string _overflow_event_name;
 
+  bool _bundling_msgs;
+  typedef std::vector< const string > BundledMsgVector;
+  BundledMsgVector _bundle_msgs;
+
   static PStatCollector _update_pcollector;
 };