Ver Fonte

Added config parameter: "async-request-num-retries"
Deleted unreachable and code commented out
Improved performance

wesbell há 18 anos atrás
pai
commit
25ddc12ae9
1 ficheiros alterados com 117 adições e 211 exclusões
  1. 117 211
      direct/src/distributed/AsyncRequest.py

+ 117 - 211
direct/src/distributed/AsyncRequest.py

@@ -1,34 +1,15 @@
-
 from otp.ai.AIBaseGlobal import *
 from direct.directnotify import DirectNotifyGlobal
 from direct.showbase.DirectObject import DirectObject
 from ConnectionRepository import *
 
 DefaultTimeout = 8.0
-TimeoutFailureCount = 2
-if __debug__:
-    ForceTimeout = config.GetFloat("async-request-timeout", -1.0)
-    BreakOnTimeout = config.GetBool("async-request-break-on-timeout", 0)
-
-_asyncRequests={}
+DefaultNumRetries = 0
 
-def _addActiveAsyncRequest(asyncRequest):
-    global _asyncRequests
-    _asyncRequests[id(asyncRequest)]=asyncRequest
-
-def _removeActiveAsyncRequest(asyncRequest):
-    global _asyncRequests
-    del _asyncRequests[id(asyncRequest)]
-
-def cleanupAsyncRequests():
-    """
-    Only call this when the application is shuting down.
-    """
-    global _asyncRequests
-    for asyncRequest in _asyncRequests:
-        asyncRequest.delete()
-    assert _asyncRequests == {}
-    _asyncRequests={}
+if __debug__:
+    _overrideTimeoutTimeForAllAsyncRequests = config.GetFloat("async-request-timeout", -1.0)
+    _overrideNumRetriesForAllAsyncRequests = config.GetInt("async-request-num-retries", -1)
+    _breakOnTimeout = config.GetBool("async-request-break-on-timeout", False)
 
 class AsyncRequest(DirectObject):
     """
@@ -54,192 +35,80 @@ class AsyncRequest(DirectObject):
     will be called again when the new self.neededObjects is complete.  You
     may repeat this as necessary.
     """
+    _asyncRequests = {}
+
     if __debug__:
         notify = DirectNotifyGlobal.directNotify.newCategory('AsyncRequest')
 
-    def __init__(self, air, replyToChannelId=None, timeout=DefaultTimeout):
+    def __init__(self, air, replyToChannelId = None,
+                 timeoutTime = DefaultTimeout,
+                 numRetries = DefaultNumRetries):
         """
         air is the AI Respository.
         replyToChannelId may be an avatarId, an accountId, or a channelId.
-        timeout is how many seconds to wait before aborting the request.
+        timeoutTime is how many seconds to wait before aborting the request.
+        numRetries is the number of times to retry the request before giving up.
         """
-        assert self.notify.debugCall()
+        assert AsyncRequest.notify.debugCall()
         if __debug__:
-            self.__deleted=False
-        _addActiveAsyncRequest(self)
-        self.deletingMessage="AsyncRequest-deleting-%s"%(id(self,))
-        #DirectObject.DirectObject.__init__(self)
-        self.air=air
-        self.replyToChannelId=replyToChannelId
+            if _overrideTimeoutTimeForAllAsyncRequests >= 0.0:
+                timeoutTime = _overrideTimeoutTimeForAllAsyncRequests
+            if _overrideNumRetriesForAllAsyncRequests >= 0:
+                numRetries = _overrideNumRetriesForAllAsyncRequests
+        AsyncRequest._asyncRequests[id(self)] = self
+        self.deletingMessage = "AsyncRequest-deleting-%s"%(id(self,))
+        self.air = air
+        self.replyToChannelId = replyToChannelId
         self.timeoutTask = None
-        self._timeoutCount = TimeoutFailureCount
-        self.neededObjects={}
-        if __debug__:
-            if ForceTimeout >= 0.0:
-                timeout = ForceTimeout
-        self.startTimeOut(timeout)
-        
+        self.neededObjects = {}
+        self._timeoutTime = timeoutTime
+        self._initialNumRetries = numRetries
+
     def delete(self):
-        assert self.notify.debugCall()
-        assert not self.__deleted
-        if __debug__:
-            self.__deleted=True
-        _removeActiveAsyncRequest(self)
+        assert AsyncRequest.notify.debugCall()
+        del AsyncRequest._asyncRequests[id(self)]
         self.ignoreAll()
-        self.cancelTimeOut()
-        del self.timeoutTask
+        self._resetTimeoutTask(False)
         messenger.send(self.deletingMessage, [])
-        if 0:
-            for i in self.neededObjects.values():
-                if i is not None:
-                    #self.air.unRegisterForChannel(o.account.doId)
-                    #self.air.removeDOFromTables(o.account)
-                    #if 0:
-                    #    o.account.delete()
-                    #    self.air.deleteObject(o.account.getDoId())
-                    self.air.removeDOFromTables(i)
-                    i.delete()
         del self.neededObjects
         del self.air
         del self.replyToChannelId
-        #DirectObject.DirectObject.delete(self)
-
-    def startTimeOut(self, timeout = None):
-        """
-        Start the request's timer.
-        timeout is the number of seconds to wait before triggering a response
-
-        The kind of response depends what our limits are
-        before finally invoking the user defined timeout()
-        function and on how many times this request has timed
-        out before.
-
-        This is called every time a this request restarts.  For example,
-        if in finish() we have to send the request back again for more
-        data (as with ships), the time resets with each new task.
-        """
-        if timeout:
-            self._timeoutTime = timeout
-        self.cancelTimeOut()
-        self.timeoutTask=taskMgr.doMethodLater(
-            self._timeoutTime, self._timeout, "AsyncRequestTimer-%s"%(id(self,)))
-        # self._timeoutCount = TimeoutFailureCount 
-
-    def cancelTimeOut(self):
-        if self.timeoutTask:
-            taskMgr.remove(self.timeoutTask)
-
-    def _timeout(self, task):
-        self._timeoutCount -= 1
-        if not self._timeoutCount:
-            self.timeout(task)
-        else:
-            assert self.notify.debug('Timed out. Trying %d more time(s) : %s' % (self._timeoutCount + 1, `self.neededObjects`))
-            self.startTimeOut()
-            
-    def timeout(self, task):
-        """
-        If this is called we have not gotten the needed objects in the timeout
-        period.  Derived classes should inform the user or whomever and then
-        call this base method to cleanup.
-        """
-        assert self.notify.debugCall("neededObjects: %s"%(self.neededObjects,))
-        assert not self.__deleted
-        if __debug__:
-            global BreakOnTimeout
-            if BreakOnTimeout:
-                if hasattr(self, "avatarId"):
-                    print "\n\nself.avatarId =", self.avatarId
-                print "\nself.neededObjects =", self.neededObjects
-                print "\ntimed out after %s seconds.\n\n"%(task.delayTime,)
-                import pdb; pdb.set_trace()
-        self.delete()
-
-    def _checkCompletion(self, name, context, distObj):
-        """
-        This checks whether we have all the needed objects and calls
-        finish() if we do.
-        """
-        assert self.notify.debugCall()
-        assert not self.__deleted
-        if name is not None:
-            self.neededObjects[name]=distObj
-        else:
-            self.neededObjects[distObj.doId]=distObj
-        for i in self.neededObjects.values():
-            if i is None:
-                return
-        self.finish()
 
     def askForObjectField(
-            self, dclassName, fieldName, doId, key=None, context=None):
+            self, dclassName, fieldName, doId, key = None, context = None):
         """
         Request an already created object, i.e. read from database.
         """
-        assert self.notify.debugCall()
-        assert not self.__deleted
+        assert AsyncRequest.notify.debugCall()
         if key is None:
             # default the dictionary key to the fieldName
             key = fieldName
         assert doId
-        ## object = self.air.doId2do.get(doId)
-        ## self.neededObjects[key]=object
-        if 0 and object is not None:
-            self._checkCompletion(key, None, object)
-        else:
-            if context is None:
-                context=self.air.allocateContext()
-            self.air.contextToClassName[context]=dclassName
-            self.acceptOnce(
-                "doFieldResponse-%s"%(context,),
-                self._checkCompletion, [key])
-            # self.neededObjects[key] = None
-            self.air.queryObjectField(dclassName, fieldName, doId, context)
-            self.startTimeOut()
-            
-    def askForObject(self, doId, context=None):
+        if context is None:
+            context = self.air.allocateContext()
+        self.air.contextToClassName[context] = dclassName
+        self.acceptOnce(
+            "doFieldResponse-%s"%(context,),
+            self._checkCompletion, [key])
+        self.air.queryObjectField(dclassName, fieldName, doId, context)
+        self._resetTimeoutTask()
+
+    def askForObject(self, doId, context = None):
         """
         Request an already created object, i.e. read from database.
         """
-        assert self.notify.debugCall()
-        assert not self.__deleted
+        assert AsyncRequest.notify.debugCall()
         assert doId
-        #object = self.air.doId2do.get(doId)
-        #self.neededObjects[doId]=object
-        #if object is not None:
-        #    self._checkCompletion(None, context, object)
-        #else:
-        if 1:
-            if context is None:
-                context=self.air.allocateContext()
-            self.acceptOnce(
-                "doRequestResponse-%s"%(context,),
-                self._checkCompletion, [None])
-            # self.neededObjects[doId] = None
-            self.air.queryObjectAll(doId, context)
-            self.startTimeOut()
-
-    #def addInterestInObject(self, doId, context=None):
-    #    """
-    #    Request an already created object, i.e. read from database
-    #    and claim a long term interest in the object (get updates, etc.).
-    #    """
-    #    assert self.notify.debugCall()
-    #    assert doId
-    #    object = self.air.doId2do.get(doId)
-    #    self.neededObjects[doId]=object
-    #    if object is not None:
-    #        self._checkCompletion(None, context, object)
-    #    else:
-    #        if context is None:
-    #            context=self.air.allocateContext()
-    #        self.accept(
-    #            "doRequestResponse-%s"%(context,),
-    #            self._checkCompletion, [None])
-    #        self.air.queryObject(doId, context)
+        if context is None:
+            context = self.air.allocateContext()
+        self.acceptOnce(
+            "doRequestResponse-%s"%(context,),
+            self._checkCompletion, [None])
+        self.air.queryObjectAll(doId, context)
+        self._resetTimeoutTask()
 
     def createObject(self, name, className,
-            databaseId=None, values=None, context=None):
+            databaseId = None, values = None, context = None):
         """
         Create a new database object.  You can get the doId from within
         your self.finish() function.
@@ -250,30 +119,20 @@ class AsyncRequest(DirectObject):
         object (which it is).  This is useful on the AI where we really
         do want the object on the AI.
         """
-        assert self.notify.debugCall()
-        assert not self.__deleted
+        assert AsyncRequest.notify.debugCall()
         assert name
         assert className
         self.neededObjects[name] = None
         if context is None:
-            context=self.air.allocateContext()
+            context = self.air.allocateContext()
         self.accept(
             self.air.getDatabaseGenerateResponseEvent(context),
             self._doCreateObject, [name, className, values])
-        ## newDBRequestGen = config.GetBool(#HACK:
-        ##     'new-database-request-generate', 1)
-        ## if newDBRequestGen:
-        ##     self.accept(
-        ##         self.air.getDatabaseGenerateResponseEvent(context),
-        ##         self._doCreateObject, [name, className, values])
-        ## else:
-        ##     self.accept(
-        ##         "doRequestResponse-%s"%(context,), self._checkCompletion, [name])
         self.air.requestDatabaseGenerate(
-            className, context, databaseId=databaseId, values=values)
-        self.startTimeOut()
+            className, context, databaseId = databaseId, values = values)
+        self._resetTimeoutTask()
 
-    def createObjectId(self, name, className, values=None, context=None):
+    def createObjectId(self, name, className, values = None, context = None):
         """
         Create a new database object.  You can get the doId from within
         your self.finish() function.
@@ -284,24 +143,30 @@ class AsyncRequest(DirectObject):
         object on the UD, we just want the object created and the UD wants
         to send messages to it using the ID.
         """
-        assert self.notify.debugCall()
-        assert not self.__deleted
+        assert AsyncRequest.notify.debugCall()
         assert name
         assert className
         self.neededObjects[name] = None
         if context is None:
-            context=self.air.allocateContext()
+            context = self.air.allocateContext()
         self.accept(
             self.air.getDatabaseGenerateResponseEvent(context),
             self._checkCompletion, [name, None])
-        self.air.requestDatabaseGenerate(className, context, values=values)
-        self.startTimeOut()
+        self.air.requestDatabaseGenerate(className, context, values = values)
+        self._resetTimeoutTask()
 
-    def _doCreateObject(self, name, className, values, doId):
+    def finish(self):
+        """
+        This is the function that gets called when all of the needed objects
+        are in (i.e. all the askForObject and createObject requests have
+        been satisfied).
+        If the other requests timeout, finish will not be called.
+        """
         assert self.notify.debugCall()
-        assert not self.__deleted
+        self.delete()
+
+    def _doCreateObject(self, name, className, values, doId):
         isInDoId2do = doId in self.air.doId2do
-        # TODO: this creates an object with no location
         distObj = self.air.generateGlobalObject(doId, className, values)
         if not isInDoId2do and game.name == 'uberDog':
             # only remove doId if this is the uberdog?, in pirates this was
@@ -311,13 +176,54 @@ class AsyncRequest(DirectObject):
             self.air.doId2do.pop(doId, None)
         self._checkCompletion(name, None, distObj)
 
-    def finish(self):
+    def _checkCompletion(self, name, context, distObj):
         """
-        This is the function that gets called when all of the needed objects
-        are in (i.e. all the askForObject and createObject requests have
-        been satisfied).
-        If the other requests timeout, finish will not be called.
+        This checks whether we have all the needed objects and calls
+        finish() if we do.
         """
-        assert self.notify.debugCall()
-        assert not self.__deleted
-        self.delete()
+        if name is not None:
+            self.neededObjects[name] = distObj
+        else:
+            self.neededObjects[distObj.doId] = distObj
+        for i in self.neededObjects.values():
+            if i is None:
+                return
+        self.finish()
+
+    def _resetTimeoutTask(self, createAnew = True):
+        if self.timeoutTask:
+            taskMgr.remove(self.timeoutTask)
+            self.timeoutTask = None
+        if createAnew:
+            self._numRetries = self._initialNumRetries
+            self.timeoutTask = taskMgr.doMethodLater(
+                self._timeoutTime, self._taskTimeoutCallback,
+                "AsyncRequestTimer-%s"%(id(self,)))
+
+    def _taskTimeoutCallback(self, task):
+        assert AsyncRequest.notify.debugCall(
+            "neededObjects: %s"%(self.neededObjects,))
+        if self._numRetries > 0:
+            assert AsyncRequest.notify.debug(
+                'Timed out. Trying %d more time(s) : %s' %
+                (self._numRetries + 1, `self.neededObjects`))
+            self._numRetries -= 1
+            return Task.again
+        else:
+            if __debug__:
+                if _breakOnTimeout:
+                    if hasattr(self, "avatarId"):
+                        print "\n\nself.avatarId =", self.avatarId
+                    print "\nself.neededObjects =", self.neededObjects
+                    print "\ntimed out after %s seconds.\n\n"%(task.delayTime,)
+                    import pdb; pdb.set_trace()
+            self.delete()
+            return Task.done
+
+def cleanupAsyncRequests():
+    """
+    Only call this when the application is shuting down.
+    """
+    for asyncRequest in AsyncRequest._asyncRequests:
+        asyncRequest.delete()
+    assert AsyncRequest._asyncRequests == {}