Переглянути джерело

task-threaded GarbageReport

Darren Ranalli 19 роки тому
батько
коміт
a5b3aa1087

+ 236 - 122
direct/src/showbase/GarbageReport.py

@@ -1,6 +1,6 @@
-from direct.directnotify import DirectNotifyGlobal
+from direct.directnotify.DirectNotifyGlobal import directNotify
 from direct.showbase import PythonUtil
 from direct.showbase import PythonUtil
-from direct.showbase.TaskThreaded import TaskThreaded
+from direct.showbase.TaskThreaded import TaskThreaded, TaskThread
 import gc
 import gc
 
 
 class FakeObject:
 class FakeObject:
@@ -16,7 +16,7 @@ class GarbageReport(TaskThreaded):
     """Detects leaked Python objects (via gc.collect()) and reports on garbage
     """Detects leaked Python objects (via gc.collect()) and reports on garbage
     items, garbage-to-garbage references, and garbage cycles.
     items, garbage-to-garbage references, and garbage cycles.
     If you just want to dump the report to the log, use GarbageLogger."""
     If you just want to dump the report to the log, use GarbageLogger."""
-    notify = DirectNotifyGlobal.directNotify.newCategory("GarbageReport")
+    notify = directNotify.newCategory("GarbageReport")
 
 
     NotGarbage = 'NG'
     NotGarbage = 'NG'
 
 
@@ -28,13 +28,8 @@ class GarbageReport(TaskThreaded):
         TaskThreaded.__init__(self, name, threaded)
         TaskThreaded.__init__(self, name, threaded)
         # stick the arguments onto a ScratchPad so we can access them from the thread
         # stick the arguments onto a ScratchPad so we can access them from the thread
         # functions and delete them all at once
         # functions and delete them all at once
-        self._args = ScratchPad()
-        self._args.name = name
-        self._args.log = log
-        self._args.verbose = verbose
-        self._args.fullReport = fullReport
-        self._args.findCycles = findCycles
-        self._args.doneCallback = doneCallback
+        self._args = ScratchPad(name=name, log=log, verbose=verbose, fullReport=fullReport,
+                                findCycles=findCycles, doneCallback=doneCallback)
 
 
         # do the garbage collection
         # do the garbage collection
         wasOn = PythonUtil.gcDebugOn()
         wasOn = PythonUtil.gcDebugOn()
@@ -54,95 +49,207 @@ class GarbageReport(TaskThreaded):
         if self._args.verbose:
         if self._args.verbose:
             self.notify.info('found %s garbage items' % self.numGarbage)
             self.notify.info('found %s garbage items' % self.numGarbage)
 
 
-        self.scheduleNext(self.T_getReferrers)
-    def T_getReferrers(self):
-
-        # grab the referrers (pointing to garbage)
         self.referrersByReference = {}
         self.referrersByReference = {}
         self.referrersByNumber = {}
         self.referrersByNumber = {}
-        if self._args.fullReport:
-            if self._args.verbose:
-                self.notify.info('getting referrers...')
-            # we need referents to detect cycles, but we don't need referrers
-            for i in xrange(self.numGarbage):
-                byNum, byRef = self._getReferrers(self.garbage[i])
-                self.referrersByNumber[i] = byNum
-                self.referrersByReference[i] = byRef
-
-        self.scheduleNext(self.T_getReferents)
-    def T_getReferents(self):
 
 
-        # grab the referents (pointed to by garbage)
         self.referentsByReference = {}
         self.referentsByReference = {}
         self.referentsByNumber = {}
         self.referentsByNumber = {}
-        if self._args.verbose:
-            self.notify.info('getting referents...')
-        for i in xrange(self.numGarbage):
-            byNum, byRef = self._getReferents(self.garbage[i])
-            self.referentsByNumber[i] = byNum
-            self.referentsByReference[i] = byRef
 
 
-        self.scheduleNext(self.T_getCycles)
-    def T_getCycles(self):
+        self.cycles = []
+        self.cycleSets = []
+
+        # grab the referrers (pointing to garbage)
+        class GetReferrers(TaskThread):
+            def setUp(self):
+                if self.parent._args.fullReport and (self.parent.numGarbage == 0):
+                    if self.parent._args.verbose:
+                        self.parent.notify.info('getting referrers...')
+                    self.index = 0
+                else:
+                    self.finished()
+            def run(self):
+                parent = self.parent
+                for i in xrange(self.index, parent.numGarbage):
+                    byNum, byRef = parent._getReferrers(parent.garbage[i])
+                    parent.referrersByNumber[i] = byNum
+                    parent.referrersByReference[i] = byRef
+                    if (not (i & 0x0F)) and (not self.timeLeft()):
+                        # we've run out of time, save the index
+                        self.index = i+1
+                        return
+                self.finished()
+            def done(self):
+                self.parent.scheduleThread(self.parent.getReferents)
+
+        # grab the referents (pointed to by garbage)
+        class GetReferents(TaskThread):
+            def setUp(self):
+                if self.parent.numGarbage == 0:
+                    self.finished()
+                else:
+                    if self.parent._args.verbose:
+                        self.parent.notify.info('getting referents...')
+                    self.index = 0
+            def run(self):
+                parent = self.parent
+                for i in xrange(self.index, self.parent.numGarbage):
+                    byNum, byRef = parent._getReferents(parent.garbage[i])
+                    parent.referentsByNumber[i] = byNum
+                    parent.referentsByReference[i] = byRef
+                    if (not (i & 0x0F)) and (not self.timeLeft()):
+                        # we've run out of time, save the index
+                        self.index = i+1
+                        return
+                self.finished()
+            def done(self):
+                self.parent.scheduleThread(self.parent.getCycles)
 
 
         # find the cycles
         # find the cycles
-        if self._args.findCycles and self.numGarbage > 0:
-            if self._args.verbose:
-                self.notify.info('detecting cycles...')
-            self.cycles = self._getCycles()
-
-        self.scheduleNext(self.T_createReport)
-    def T_createReport(self):
-
-        s = '\n===== GarbageReport: \'%s\' (%s items) =====' % (self._args.name, self.numGarbage)
-        if self.numGarbage > 0:
-            # log each individual item with a number in front of it
-            s += '\n\n===== Garbage Items ====='
-            digits = 0
-            n = self.numGarbage
-            while n > 0:
-                digits += 1
-                n /= 10
-            format = '\n%0' + '%s' % digits + 'i:%s \t%s'
-            for i in range(len(self.garbage)):
-                s += format % (i, type(self.garbage[i]), self.garbage[i])
-
-            if self._args.findCycles:
-                format = '\n%s'
-                s += '\n\n===== Cycles ====='
-                for cycle in self.cycles:
-                    s += format % cycle
-
-            if self._args.fullReport:
-                format = '\n%0' + '%s' % digits + 'i:%s'
-                s += '\n\n===== Referrers By Number (what is referring to garbage item?) ====='
-                for i in xrange(self.numGarbage):
-                    s += format % (i, self.referrersByNumber[i])
-                s += '\n\n===== Referents By Number (what is garbage item referring to?) ====='
-                for i in xrange(self.numGarbage):
-                    s += format % (i, self.referentsByNumber[i])
-                s += '\n\n===== Referrers (what is referring to garbage item?) ====='
-                for i in xrange(self.numGarbage):
-                    s += format % (i, self.referrersByReference[i])
-                s += '\n\n===== Referents (what is garbage item referring to?) ====='
-                for i in xrange(self.numGarbage):
-                    s += format % (i, self.referentsByReference[i])
-
-        self._report = s
-
-        self.scheduleNext(self.T_printReport)
-    def T_printReport(self):
-
-        if self._args.log:
-            self.notify.info(self._report)
-
-        self.scheduleNext(self.T_completed)
-    def T_completed(self):
+        class GetCycles(TaskThread):
+            def setUp(self):
+                if self.parent._args.findCycles and self.parent.numGarbage > 0:
+                    if self.parent._args.verbose:
+                        self.parent.notify.info('detecting cycles...')
+                    self.index = 0
+                else:
+                    self.finished()
+            def run(self):
+                for i in xrange(self.index, self.parent.numGarbage):
+                    self.parent.cycles.extend(self.parent._getCycles(i, self.parent.cycleSets))
+                    if (not (i & 0x0F)) and (not self.timeLeft()):
+                        # we've run out of time, save the index
+                        self.index = i+1
+                        return
+                self.finished()
+            def done(self):
+                self.parent.scheduleThread(self.parent.createReport)
+
+        class CreateReport(TaskThread):
+            def setUp(self):
+                self.s = ['===== GarbageReport: \'%s\' (%s items) =====' % (
+                    self.parent._args.name, self.parent.numGarbage)]
+                if self.parent.numGarbage == 0:
+                    self.finished()
+                else:
+                    self.curPhase = 0
+                    self.index = 0
+            def run(self):
+                if self.curPhase == 0:
+                    # log each individual item with a number in front of it
+                    if self.index == 0:
+                        self.s.append('\n===== Garbage Items =====')
+                        digits = 0
+                        n = self.parent.numGarbage
+                        while n > 0:
+                            digits += 1
+                            n /= 10
+                        self.digits = digits
+                        self.format = '%0' + '%s' % digits + 'i:%s \t%s'
+                    for i in xrange(self.index, self.parent.numGarbage):
+                        self.s.append(self.format % (i, type(self.parent.garbage[i]), self.parent.garbage[i]))
+                        if (not (i & 0x7F)) and (not self.timeLeft()):
+                            # we've run out of time, save the index
+                            self.index = i+1
+                            return
+                    self.curPhase = 1
+                    self.index = 0
+                if self.curPhase == 1:
+                    if self.parent._args.findCycles:
+                        if self.index == 0:
+                            self.s.append('\n===== Cycles =====')
+                        for i in xrange(self.index, len(self.parent.cycles)):
+                            self.s.append('%s' % self.parent.cycles[i])
+                            if (not (i & 0x7F)) and (not self.timeLeft()):
+                                # we've run out of time, save the index
+                                self.index = i+1
+                                return
+                    self.curPhase = 2
+                    self.index = 0
+                if self.parent._args.fullReport:
+                    format = '%0' + '%s' % self.digits + 'i:%s'
+                    if self.curPhase == 2:
+                        if self.index == 0:
+                            self.s.append('\n===== Referrers By Number (what is referring to garbage item?) =====')
+                        for i in xrange(self.index, self.parent.numGarbage):
+                            self.s.append(format % (i, self.parent.referrersByNumber[i]))
+                            if (not (i & 0x7F)) and (not self.timeLeft()):
+                                # we've run out of time, save the index
+                                self.index = i+1
+                                return
+                        self.curPhase = 3
+                        self.index = 0
+                    if self.curPhase == 3:
+                        if self.index == 0:
+                            self.s.append('\n===== Referents By Number (what is garbage item referring to?) =====')
+                        for i in xrange(self.index, self.parent.numGarbage):
+                            self.s.append(format % (i, self.parent.referentsByNumber[i]))
+                            if (not (i & 0x7F)) and (not self.timeLeft()):
+                                # we've run out of time, save the index
+                                self.index = i+1
+                                return
+                        self.curPhase = 4
+                        self.index = 0
+                    if self.curPhase == 4:
+                        if self.index == 0:
+                            self.s.append('\n===== Referrers (what is referring to garbage item?) =====')
+                        for i in xrange(self.index, self.parent.numGarbage):
+                            self.s.append(format % (i, self.parent.referrersByReference[i]))
+                            if (not (i & 0x7F)) and (not self.timeLeft()):
+                                # we've run out of time, save the index
+                                self.index = i+1
+                                return
+                        self.curPhase = 5
+                        self.index = 0
+                    if self.curPhase == 5:
+                        if self.index == 0:
+                            self.s.append('\n===== Referents (what is garbage item referring to?) =====')
+                        for i in xrange(self.index, self.parent.numGarbage):
+                            self.s.append(format % (i, self.referentsByReference[i]))
+                            if (not (i & 0x7F)) and (not self.timeLeft()):
+                                # we've run out of time, save the index
+                                self.index = i+1
+                                return
+                self.finished()
+
+            def done(self):
+                self.parent._report = self.s
+                self.parent.scheduleThread(self.parent.printReport)
+
+        class PrintReport(TaskThread):
+            def setUp(self):
+                if not self.parent._args.log:
+                    self.finished()
+                else:
+                    self.index = 0
+            def run(self):
+                if self.index > 0:
+                    self.parent.notify.info('RESUME')
+                for i in xrange(self.index, len(self.parent._report)):
+                    print self.parent._report[i]
+                    if (not (i & 0x3F)) and (not self.timeLeft()):
+                        self.parent.notify.info('SUSPEND')
+                        # we've run out of time, save the index
+                        self.index = i+1
+                        return
+                self.finished()
+            def done(self):
+                if self.parent._args.doneCallback:
+                    self.parent._args.doneCallback(self.parent)
 
 
-        if self._args.doneCallback:
-            self._args.doneCallback(self)
+        self.getReferrers = GetReferrers()
+        self.getReferents = GetReferents()
+        self.getCycles = GetCycles()
+        self.createReport = CreateReport()
+        self.printReport = PrintReport()
+
+        self.scheduleThread(self.getReferrers)
 
 
     def destroy(self):
     def destroy(self):
+        del self.getReferrers
+        del self.getReferents
+        del self.getCycles
+        del self.createReport
+        del self.printReport
         del self._args
         del self._args
         del self.garbage
         del self.garbage
         del self.numGarbage
         del self.numGarbage
@@ -153,6 +260,8 @@ class GarbageReport(TaskThreaded):
         if hasattr(self, 'cycles'):
         if hasattr(self, 'cycles'):
             del self.cycles
             del self.cycles
         del self._report
         del self._report
+        if hasattr(self, '_reportStr'):
+            del self._reportStr
 
 
     def getNumItems(self):
     def getNumItems(self):
         return self.numGarbage
         return self.numGarbage
@@ -161,7 +270,11 @@ class GarbageReport(TaskThreaded):
         return self.garbage
         return self.garbage
 
 
     def getReport(self):
     def getReport(self):
-        return self._report
+        if not hasattr(self, '_reportStr'):
+            self._reportStr = ''
+            for str in self._report:
+                self._reportStr += '\n' + str
+        return self._reportStr
 
 
     def _getReferrers(self, obj):
     def _getReferrers(self, obj):
         # referrers (pointing to garbage)
         # referrers (pointing to garbage)
@@ -195,51 +308,52 @@ class GarbageReport(TaskThreaded):
                 pass
                 pass
         return byNum, byRef
         return byNum, byRef
 
 
-    def _getCycles(self):
+    def _getCycles(self, index, cycleSets=None):
+        # detect garbage cycles for a particular item of garbage
         assert self.notify.debugCall()
         assert self.notify.debugCall()
         # returns list of lists, sublists are garbage reference cycles
         # returns list of lists, sublists are garbage reference cycles
         cycles = []
         cycles = []
         # sets of cycle members, to avoid duplicates
         # sets of cycle members, to avoid duplicates
-        cycleSets = []
+        if cycleSets is None:
+            cycleSets = []
         stateStack = Stack()
         stateStack = Stack()
-        for rootId in xrange(len(self.garbage)):
-            assert len(stateStack) == 0
-            stateStack.push(([rootId], rootId, 0))
-            while True:
-                if len(stateStack) == 0:
-                    break
-                candidateCycle, curId, resumeIndex = stateStack.pop()
+        rootId = index
+        stateStack.push(([rootId], rootId, 0))
+        while True:
+            if len(stateStack) == 0:
+                break
+            candidateCycle, curId, resumeIndex = stateStack.pop()
+            if self.notify.getDebug():
+                print 'restart: %s root=%s cur=%s resume=%s' % (
+                    candidateCycle, rootId, curId, resumeIndex)
+            for index in xrange(resumeIndex, len(self.referentsByNumber[curId])):
+                refId = self.referentsByNumber[curId][index]
                 if self.notify.getDebug():
                 if self.notify.getDebug():
-                    print 'restart: %s root=%s cur=%s resume=%s' % (
-                        candidateCycle, rootId, curId, resumeIndex)
-                for index in xrange(resumeIndex, len(self.referentsByNumber[curId])):
-                    refId = self.referentsByNumber[curId][index]
-                    if self.notify.getDebug():
-                        print '       : %s -> %s' % (curId, refId)
-                    if refId == rootId:
-                        # we found a cycle! mark it down and move on to the next refId
-                        if not set(candidateCycle) in cycleSets:
-                            if self.notify.getDebug():
-                                print '  FOUND: ', list(candidateCycle) + [refId]
-                            cycles.append(list(candidateCycle) + [refId])
-                            cycleSets.append(set(candidateCycle))
-                    elif refId in candidateCycle:
-                        pass
-                    else:
-                        # this refId does not complete a cycle. Mark down
-                        # where we are in this list of referents, then
-                        # start looking through the referents of the new refId
-                        stateStack.push((list(candidateCycle), curId, index+1))
-                        stateStack.push((list(candidateCycle) + [refId], refId, 0))
-                        break
+                    print '       : %s -> %s' % (curId, refId)
+                if refId == rootId:
+                    # we found a cycle! mark it down and move on to the next refId
+                    if not set(candidateCycle) in cycleSets:
+                        if self.notify.getDebug():
+                            print '  FOUND: ', list(candidateCycle) + [refId]
+                        cycles.append(list(candidateCycle) + [refId])
+                        cycleSets.append(set(candidateCycle))
+                elif refId in candidateCycle:
+                    pass
+                else:
+                    # this refId does not complete a cycle. Mark down
+                    # where we are in this list of referents, then
+                    # start looking through the referents of the new refId
+                    stateStack.push((list(candidateCycle), curId, index+1))
+                    stateStack.push((list(candidateCycle) + [refId], refId, 0))
+                    break
         return cycles
         return cycles
 
 
 class GarbageLogger(GarbageReport):
 class GarbageLogger(GarbageReport):
     """If you just want to log the current garbage to the log file, make
     """If you just want to log the current garbage to the log file, make
     one of these. It automatically destroys itself after logging"""
     one of these. It automatically destroys itself after logging"""
-    def __init__(self, *args, **kArgs):
+    def __init__(self, name, *args, **kArgs):
         kArgs['log'] = True
         kArgs['log'] = True
-        GarbageReport.__init__(self, *args, **kArgs)
+        GarbageReport.__init__(self, name, *args, **kArgs)
     def T_completed(self):
     def T_completed(self):
         GarbageReport.T_completed(self)
         GarbageReport.T_completed(self)
         self.destroy()
         self.destroy()

+ 3 - 1
direct/src/showbase/PythonUtil.py

@@ -1988,7 +1988,9 @@ def gcDebugOn():
 
 
 class ScratchPad:
 class ScratchPad:
     """empty class to stick values onto"""
     """empty class to stick values onto"""
-    pass
+    def __init__(self, **kArgs):
+        for key, value in kArgs.items():
+            setattr(self, key, value)
 
 
 import __builtin__
 import __builtin__
 __builtin__.Functor = Functor
 __builtin__.Functor = Functor

+ 115 - 12
direct/src/showbase/TaskThreaded.py

@@ -1,32 +1,135 @@
+from direct.directnotify.DirectNotifyGlobal import directNotify
 from direct.task import Task
 from direct.task import Task
 
 
 class TaskThreaded:
 class TaskThreaded:
     """ derive from this if you need to do a bunch of CPU-intensive
     """ derive from this if you need to do a bunch of CPU-intensive
     processing and you don't want to hang up the show. Lets you break
     processing and you don't want to hang up the show. Lets you break
     up the processing over multiple frames """
     up the processing over multiple frames """
+    notify = directNotify.newCategory("TaskThreaded")
+
     _Serial = SerialNum()
     _Serial = SerialNum()
     
     
-    def __init__(self, name, threaded=True):
-        self._name = name
-        self._threaded=threaded
-        self._taskNames = set()
+    def __init__(self, name, threaded=True, timeslice=.01):
+        # timeslice is how long this thread should take every frame.
+        self.__name = name
+        self.__threaded=threaded
+        self.__timeslice = timeslice
+        self.__taskNames = set()
+        self._taskStartTime = None
+        self.__threads = set()
 
 
     def destroy(self):
     def destroy(self):
-        for taskName in self._taskNames:
+        for taskName in self.__taskNames:
             taskMgr.remove(taskName)
             taskMgr.remove(taskName)
+        del self.__taskNames
+        for thread in self.__threads:
+            thread.tearDown()
+            thread._destroy()
+        del self.__threads
+
+    def getTimeslice(self):
+        return self.___timeslice
+    def setTimeslice(self, timeslice):
+        self.__timeslice = timeslice
 
 
-    def scheduleNext(self, callback):
-        if not self._threaded:
+    def scheduleCallback(self, callback):
+        assert self.notify.debugCall()
+        if not self.__threaded:
             callback()
             callback()
         else:
         else:
             taskName = ('%s-ThreadedTask-%s' %
             taskName = ('%s-ThreadedTask-%s' %
-                        (self._name, TaskThreaded._Serial.next()))
-            assert taskName not in self._taskNames
-            self._taskNames.add(taskName)
-            taskMgr.add(Functor(self._doCallback, callback, taskName),
+                        (self.__name, TaskThreaded._Serial.next()))
+            assert taskName not in self.__taskNames
+            self.__taskNames.add(taskName)
+            taskMgr.add(Functor(self.__doCallback, callback, taskName),
                         taskName)
                         taskName)
 
 
+    def scheduleThread(self, thread):
+        assert self.notify.debugCall()
+        # pass in a TaskThread. TaskThreaded will take over ownership and
+        # cleanup responsibilities
+        thread._init(self)
+        thread.setUp()
+        if thread.isFinished():
+            thread._destroy()
+        else:
+            if not self.__threaded:
+                thread.run()
+                thread._destroy()
+            else:
+                assert not thread in self.__threads
+                self.__threads.add(thread)
+                taskName = ('%s-ThreadedTask-%s-%s' %
+                            (self.__name, thread.__class__.__name__,
+                             TaskThreaded._Serial.next()))
+                assert taskName not in self.__taskNames
+                self.__taskNames.add(taskName)
+                self.__threads.add(thread)
+                taskMgr.add(Functor(self._doThreadCallback, thread, taskName),
+                            taskName)
+
     def _doCallback(self, callback, taskName, task):
     def _doCallback(self, callback, taskName, task):
-        self._taskNames.remove(taskName)
+        assert self.notify.debugCall()
+        self.__taskNames.remove(taskName)
+        self._taskStartTime = globalClock.getRealTime()
         callback()
         callback()
+        self._taskStartTime = None
         return Task.done
         return Task.done
+
+    def _doThreadCallback(self, thread, taskName, task):
+        assert self.notify.debugCall()
+        self._taskStartTime = globalClock.getRealTime()
+        thread.run()
+        self._taskStartTime = None
+        if thread.isFinished():
+            thread._destroy()
+            self.__taskNames.remove(taskName)
+            self.__threads.remove(thread)
+            return Task.done
+        else:
+            return Task.cont
+
+    def taskTimeLeft(self):
+        """returns True if there is time left for the current task callback
+        to run without going over the allotted timeslice"""
+        if self._taskStartTime is None:
+            # we must not be in a task callback, we must be running in non-threaded
+            # mode
+            return True
+        return (globalClock.getRealTime() - self._taskStartTime) < self.__timeslice
+
+class TaskThread:
+    # derive and override these four funcs
+    # TaskThreaded obj is available as 'self.parent'
+    # attributes of TaskThreaded obj are available directly as self.variable
+    # call self.finished() when you're done
+    def setUp(self):
+        pass
+    def run(self):
+        pass
+    def tearDown(self):
+        # undo what you did in setUp()
+        # this will be called if we get destroyed early
+        pass
+    def done(self):
+        # override this if you want to do stuff after the thread finishes
+        pass
+
+    # call this when your task is complete
+    def finished(self):
+        self.tearDown()
+        self._finished = True
+        self.done()
+    def isFinished(self):
+        return self._finished
+
+    # call this to find out if you've gone over your timeslice
+    def timeLeft(self):
+        return self.parent.taskTimeLeft()
+
+    def _init(self, parent):
+        self.parent = parent
+        self._finished = False
+    def _destroy(self):
+        del self.parent
+        del self._finished