Browse Source

handle more exotic uses of taskMgr

David Rose 17 years ago
parent
commit
7e217b8bdc

+ 3 - 2
direct/src/showbase/DirectObject.py

@@ -50,7 +50,6 @@ class DirectObject:
             self._taskList = {}
         kwargs['owner']=self
         task = taskMgr.add(*args, **kwargs)
-        self._taskList[task.id] = task
         return task
     
     def doMethodLater(self, *args, **kwargs):
@@ -58,7 +57,6 @@ class DirectObject:
             self._taskList ={}
         kwargs['owner']=self            
         task = taskMgr.doMethodLater(*args, **kwargs)
-        self._taskList[task.id] = task
         return task
     
     def removeTask(self, taskOrName):
@@ -77,6 +75,9 @@ class DirectObject:
             for task in self._taskList.values():
                 task.remove()
 
+    def _addTask(self, task):
+        self._taskList[task.id] = task
+
     def _clearTask(self, task):
         del self._taskList[task.id]        
         

+ 567 - 16
direct/src/task/TaskNew.py

@@ -6,6 +6,7 @@ from direct.directnotify.DirectNotifyGlobal import *
 from direct.showbase import ExceptionVarDump
 import signal
 import types
+import time
 
 from pandac.PandaModules import *
 
@@ -66,20 +67,24 @@ class TaskManager:
     notify = directNotify.newCategory("TaskManager")
 
     extendedExceptions = False
+    MaxEpochSpeed = 1.0/30.0
 
     def __init__(self):
         self.mgr = AsyncTaskManager('TaskManager')
 
         self.resumeFunc = None
-        self.globalClock = None
+        self.globalClock = self.mgr.getClock()
         self.stepping = False
         self.running = False
         self.fKeyboardInterrupt = False
         self.interruptCount = 0
 
     def destroy(self):
-        self.mgr.stopThreads()
-        self.removeTasksMatching('*')
+        self.mgr.cleanup()
+
+    def setClock(self, clockObject):
+        self.mgr.setClock(clockObject)
+        self.globalClock = clockObject
 
     def keyboardInterruptHandler(self, signalNumber, stackFrame):
         self.fKeyboardInterrupt = 1
@@ -137,11 +142,24 @@ class TaskManager:
             chain.setFrameBudget(frameBudget)
 
     def hasTaskNamed(self, taskName):
+        """Returns true if there is at least one task, active or
+        sleeping, with the indicated name. """
+        
         return bool(self.mgr.findTask(taskName))
 
     def getTasksNamed(self, taskName):
+        """Returns a list of all tasks, active or sleeping, with the
+        indicated name. """
         return self.__makeTaskList(self.mgr.findTasks(taskName))
 
+    def getTasks(self):
+        """Returns list of all active tasks in arbitrary order. """
+        return self.__makeTaskList(self.mgr.getActiveTasks())
+
+    def getDoLaters(self):
+        """Returns list of all sleeping tasks in arbitrary order. """
+        return self.__makeTaskList(self.mgr.getSleepingTasks())
+
     def __makeTaskList(self, taskCollection):
         l = []
         for i in range(taskCollection.getNumTasks()):
@@ -150,27 +168,28 @@ class TaskManager:
 
     def doMethodLater(self, delayTime, funcOrTask, name, priority = None,
                       sort = None, extraArgs = None, taskChain = None,
-                      appendTask = False):
+                      appendTask = False, owner = None, uponDeath = None):
         if delayTime < 0:
             assert self.notify.warning('doMethodLater: added task: %s with negative delay: %s' % (name, delayTime))
 
-        task = self.__setupTask(funcOrTask, name, priority, sort, extraArgs, taskChain, appendTask)
+        task = self.__setupTask(funcOrTask, name, priority, sort, extraArgs, taskChain, appendTask, owner, uponDeath)
         task.setDelay(delayTime)
         self.mgr.add(task)
         return task
 
     def add(self, funcOrTask, name, priority = None, sort = None,
-            extraArgs = None, taskChain = None, appendTask = False):
+            extraArgs = None, taskChain = None, appendTask = False,
+            owner = None, uponDeath = None):
         
         """
         Add a new task to the taskMgr.
         You can add a Task object or a method that takes one argument.
         """
-        task = self.__setupTask(funcOrTask, name, priority, sort, extraArgs, taskChain, appendTask)
+        task = self.__setupTask(funcOrTask, name, priority, sort, extraArgs, taskChain, appendTask, owner, uponDeath)
         self.mgr.add(task)
         return task
 
-    def __setupTask(self, funcOrTask, name, priority, sort, extraArgs, taskChain, appendTask):
+    def __setupTask(self, funcOrTask, name, priority, sort, extraArgs, taskChain, appendTask, owner, uponDeath):
         if isinstance(funcOrTask, PythonTask):
             task = funcOrTask
         elif callable(funcOrTask):
@@ -179,6 +198,11 @@ class TaskManager:
             self.notify.error(
                 'add: Tried to add a task that was not a Task or a func')
         assert isinstance(name, types.StringTypes), 'Name must be a string type'
+        if extraArgs is None:
+            extraArgs = []
+            appendTask = True
+        task.setArgs(extraArgs, appendTask)
+
         task.setName(name)
 
         # For historical reasons, if priority is specified but not
@@ -191,14 +215,20 @@ class TaskManager:
             if sort is not None:
                 task.setSort(sort)
 
-        if taskChain is not None:
-            task.setTaskChain(taskChain)
-
         if extraArgs is None:
             extraArgs = []
             appendTask = True
         task.setArgs(extraArgs, appendTask)
 
+        if taskChain is not None:
+            task.setTaskChain(taskChain)
+
+        if owner is not None:
+            task.setOwner(owner)
+
+        if uponDeath is not None:
+            task.setUponDeath(uponDeath)
+
         return task
         
     def remove(self, taskOrName):
@@ -226,7 +256,13 @@ class TaskManager:
         self.interruptCount = 0
         signal.signal(signal.SIGINT, self.keyboardInterruptHandler)
 
+        startFrameTime = self.globalClock.getRealTime()
+
         self.mgr.poll()
+
+        # This is the spot for an internal yield function
+        nextTaskTime = self.mgr.getNextWakeTime()
+        self.doYield(startFrameTime, nextTaskTime)
         
         # Restore default interrupt handler
         signal.signal(signal.SIGINT, signal.default_int_handler)
@@ -236,11 +272,10 @@ class TaskManager:
     def run(self):
         # Set the clock to have last frame's time in case we were
         # Paused at the prompt for a long time
-        if self.globalClock:
-            t = self.globalClock.getFrameTime()
-            timeDelta = t - globalClock.getRealTime()
-            self.globalClock.setRealTime(t)
-            messenger.send("resetClock", [timeDelta])
+        t = self.globalClock.getFrameTime()
+        timeDelta = t - globalClock.getRealTime()
+        self.globalClock.setRealTime(t)
+        messenger.send("resetClock", [timeDelta])
 
         if self.resumeFunc != None:
             self.resumeFunc()
@@ -299,3 +334,519 @@ class TaskManager:
 
     def __repr__(self):
         return str(self.mgr)
+
+    # In the event we want to do frame time managment, this is the
+    # function to replace or overload.
+    def doYield(self, frameStartTime, nextScheduledTaskTime):
+          None
+          
+    def doYieldExample(self, frameStartTime, nextScheduledTaskTime):
+        minFinTime = frameStartTime + self.MaxEpochSpeed
+        if nextScheduledTaskTime > 0 and nextScheduledTaskTime < minFinTime:
+            print ' Adjusting Time'
+            minFinTime = nextScheduledTaskTime
+        delta = minFinTime - self.globalClock.getRealTime()
+        while(delta > 0.002):
+            print ' sleep %s'% (delta)
+            time.sleep(delta)           
+            delta = minFinTime - self.globalClock.getRealTime()
+    
+    if __debug__:
+        # to catch memory leaks during the tests at the bottom of the file
+        def _startTrackingMemLeaks(self):
+            pass
+
+        def _stopTrackingMemLeaks(self):
+            pass
+
+        def _checkMemLeaks(self):
+            pass
+
+    def _runTests(self):
+        if __debug__:
+            tm = TaskManager()
+            tm.setClock(ClockObject())
+            tm.setupTaskChain("default", tickClock = True)
+
+            # check for memory leaks after every test
+            tm._startTrackingMemLeaks()
+            tm._checkMemLeaks()
+
+            # run-once task
+            l = []
+            def _testDone(task, l=l):
+                l.append(None)
+                return task.done
+            tm.add(_testDone, 'testDone')
+            tm.step()
+            assert len(l) == 1
+            tm.step()
+            assert len(l) == 1
+            _testDone = None
+            tm._checkMemLeaks()
+
+            # remove by name
+            def _testRemoveByName(task):
+                return task.done
+            tm.add(_testRemoveByName, 'testRemoveByName')
+            assert tm.remove('testRemoveByName') == 1
+            assert tm.remove('testRemoveByName') == 0
+            _testRemoveByName = None
+            tm._checkMemLeaks()
+
+            # duplicate named tasks
+            def _testDupNamedTasks(task):
+                return task.done
+            tm.add(_testDupNamedTasks, 'testDupNamedTasks')
+            tm.add(_testDupNamedTasks, 'testDupNamedTasks')
+            assert tm.remove('testRemoveByName') == 0
+            _testDupNamedTasks = None
+            tm._checkMemLeaks()
+
+            # continued task
+            l = []
+            def _testCont(task, l = l):
+                l.append(None)
+                return task.cont
+            tm.add(_testCont, 'testCont')
+            tm.step()
+            assert len(l) == 1
+            tm.step()
+            assert len(l) == 2
+            tm.remove('testCont')
+            _testCont = None
+            tm._checkMemLeaks()
+
+            # continue until done task
+            l = []
+            def _testContDone(task, l = l):
+                l.append(None)
+                if len(l) >= 2:
+                    return task.done
+                else:
+                    return task.cont
+            tm.add(_testContDone, 'testContDone')
+            tm.step()
+            assert len(l) == 1
+            tm.step()
+            assert len(l) == 2
+            tm.step()
+            assert len(l) == 2
+            assert not tm.hasTaskNamed('testContDone')
+            _testContDone = None
+            tm._checkMemLeaks()
+
+            # hasTaskNamed
+            def _testHasTaskNamed(task):
+                return task.done
+            tm.add(_testHasTaskNamed, 'testHasTaskNamed')
+            assert tm.hasTaskNamed('testHasTaskNamed')
+            tm.step()
+            assert not tm.hasTaskNamed('testHasTaskNamed')
+            _testHasTaskNamed = None
+            tm._checkMemLeaks()
+
+            # task sort
+            l = []
+            def _testPri1(task, l = l):
+                l.append(1)
+                return task.cont
+            def _testPri2(task, l = l):
+                l.append(2)
+                return task.cont
+            tm.add(_testPri1, 'testPri1', sort = 1)
+            tm.add(_testPri2, 'testPri2', sort = 2)
+            tm.step()
+            assert len(l) == 2
+            assert l == [1, 2,]
+            tm.step()
+            assert len(l) == 4
+            assert l == [1, 2, 1, 2,]
+            tm.remove('testPri1')
+            tm.remove('testPri2')
+            _testPri1 = None
+            _testPri2 = None
+            tm._checkMemLeaks()
+
+            # task extraArgs
+            l = []
+            def _testExtraArgs(arg1, arg2, l=l):
+                l.extend([arg1, arg2,])
+                return done
+            tm.add(_testExtraArgs, 'testExtraArgs', extraArgs=[4,5])
+            tm.step()
+            assert len(l) == 2
+            assert l == [4, 5,]
+            _testExtraArgs = None
+            tm._checkMemLeaks()
+
+            # task appendTask
+            l = []
+            def _testAppendTask(arg1, arg2, task, l=l):
+                l.extend([arg1, arg2,])
+                return task.done
+            tm.add(_testAppendTask, '_testAppendTask', extraArgs=[4,5], appendTask=True)
+            tm.step()
+            assert len(l) == 2
+            assert l == [4, 5,]
+            _testAppendTask = None
+            tm._checkMemLeaks()
+
+            # task uponDeath
+            l = []
+            def _uponDeathFunc(task, l=l):
+                l.append(task.name)
+            def _testUponDeath(task):
+                return done
+            tm.add(_testUponDeath, 'testUponDeath', uponDeath=_uponDeathFunc)
+            tm.step()
+            assert len(l) == 1
+            assert l == ['testUponDeath']
+            _testUponDeath = None
+            _uponDeathFunc = None
+            tm._checkMemLeaks()
+
+            # task owner
+            class _TaskOwner:
+                def _addTask(self, task):
+                    self.addedTaskName = task.name
+                def _clearTask(self, task):
+                    self.clearedTaskName = task.name
+            to = _TaskOwner()
+            l = []
+            def _testOwner(task):
+                return done
+            tm.add(_testOwner, 'testOwner', owner=to)
+            tm.step()
+            assert getattr(to, 'addedTaskName', None) == 'testOwner'
+            assert getattr(to, 'clearedTaskName', None) == 'testOwner'
+            _testOwner = None
+            del to
+            _TaskOwner = None
+            tm._checkMemLeaks()
+
+
+            doLaterTests = [0,]
+
+            # doLater
+            l = []
+            def _testDoLater1(task, l=l):
+                l.append(1)
+            def _testDoLater2(task, l=l):
+                l.append(2)
+            def _monitorDoLater(task, tm=tm, l=l, doLaterTests=doLaterTests):
+                if task.time > .03:
+                    assert l == [1, 2,]
+                    doLaterTests[0] -= 1
+                    return task.done
+                return task.cont
+            tm.doMethodLater(.01, _testDoLater1, 'testDoLater1')
+            tm.doMethodLater(.02, _testDoLater2, 'testDoLater2')
+            doLaterTests[0] += 1
+            # make sure we run this task after the doLaters if they all occur on the same frame
+            tm.add(_monitorDoLater, 'monitorDoLater', sort=10)
+            _testDoLater1 = None
+            _testDoLater2 = None
+            _monitorDoLater = None
+            # don't check until all the doLaters are finished
+            #tm._checkMemLeaks()
+
+            # doLater sort
+            l = []
+            def _testDoLaterPri1(task, l=l):
+                l.append(1)
+            def _testDoLaterPri2(task, l=l):
+                l.append(2)
+            def _monitorDoLaterPri(task, tm=tm, l=l, doLaterTests=doLaterTests):
+                if task.time > .02:
+                    assert l == [1, 2,]
+                    doLaterTests[0] -= 1
+                    return task.done
+                return task.cont
+            tm.doMethodLater(.01, _testDoLaterPri1, 'testDoLaterPri1', sort=1)
+            tm.doMethodLater(.01, _testDoLaterPri2, 'testDoLaterPri2', sort=2)
+            doLaterTests[0] += 1
+            # make sure we run this task after the doLaters if they all occur on the same frame
+            tm.add(_monitorDoLaterPri, 'monitorDoLaterPri', sort=10)
+            _testDoLaterPri1 = None
+            _testDoLaterPri2 = None
+            _monitorDoLaterPri = None
+            # don't check until all the doLaters are finished
+            #tm._checkMemLeaks()
+
+            # doLater extraArgs
+            l = []
+            def _testDoLaterExtraArgs(arg1, l=l):
+                l.append(arg1)
+            def _monitorDoLaterExtraArgs(task, tm=tm, l=l, doLaterTests=doLaterTests):
+                if task.time > .02:
+                    assert l == [3,]
+                    doLaterTests[0] -= 1
+                    return task.done
+                return task.cont
+            tm.doMethodLater(.01, _testDoLaterExtraArgs, 'testDoLaterExtraArgs', extraArgs=[3,])
+            doLaterTests[0] += 1
+            # make sure we run this task after the doLaters if they all occur on the same frame
+            tm.add(_monitorDoLaterExtraArgs, 'monitorDoLaterExtraArgs', sort=10)
+            _testDoLaterExtraArgs = None
+            _monitorDoLaterExtraArgs = None
+            # don't check until all the doLaters are finished
+            #tm._checkMemLeaks()
+
+            # doLater appendTask
+            l = []
+            def _testDoLaterAppendTask(arg1, task, l=l):
+                assert task.name == 'testDoLaterAppendTask'
+                l.append(arg1)
+            def _monitorDoLaterAppendTask(task, tm=tm, l=l, doLaterTests=doLaterTests):
+                if task.time > .02:
+                    assert l == [4,]
+                    doLaterTests[0] -= 1
+                    return task.done
+                return task.cont
+            tm.doMethodLater(.01, _testDoLaterAppendTask, 'testDoLaterAppendTask',
+                             extraArgs=[4,], appendTask=True)
+            doLaterTests[0] += 1
+            # make sure we run this task after the doLaters if they all occur on the same frame
+            tm.add(_monitorDoLaterAppendTask, 'monitorDoLaterAppendTask', sort=10)
+            _testDoLaterAppendTask = None
+            _monitorDoLaterAppendTask = None
+            # don't check until all the doLaters are finished
+            #tm._checkMemLeaks()
+
+            # doLater uponDeath
+            l = []
+            def _testUponDeathFunc(task, l=l):
+                assert task.name == 'testDoLaterUponDeath'
+                l.append(10)
+            def _testDoLaterUponDeath(arg1, l=l):
+                return done
+            def _monitorDoLaterUponDeath(task, tm=tm, l=l, doLaterTests=doLaterTests):
+                if task.time > .02:
+                    assert l == [10,]
+                    doLaterTests[0] -= 1
+                    return task.done
+                return task.cont
+            tm.doMethodLater(.01, _testDoLaterUponDeath, 'testDoLaterUponDeath',
+                             uponDeath=_testUponDeathFunc)
+            doLaterTests[0] += 1
+            # make sure we run this task after the doLaters if they all occur on the same frame
+            tm.add(_monitorDoLaterUponDeath, 'monitorDoLaterUponDeath', sort=10)
+            _testUponDeathFunc = None
+            _testDoLaterUponDeath = None
+            _monitorDoLaterUponDeath = None
+            # don't check until all the doLaters are finished
+            #tm._checkMemLeaks()
+
+            # doLater owner
+            class _DoLaterOwner:
+                def _addTask(self, task):
+                    self.addedTaskName = task.name
+                def _clearTask(self, task):
+                    self.clearedTaskName = task.name
+            doLaterOwner = _DoLaterOwner()
+            l = []
+            def _testDoLaterOwner(l=l):
+                pass
+            def _monitorDoLaterOwner(task, tm=tm, l=l, doLaterOwner=doLaterOwner,
+                                     doLaterTests=doLaterTests):
+                if task.time > .02:
+                    assert getattr(doLaterOwner, 'addedTaskName', None) == 'testDoLaterOwner'
+                    assert getattr(doLaterOwner, 'clearedTaskName', None) == 'testDoLaterOwner'
+                    doLaterTests[0] -= 1
+                    return task.done
+                return task.cont
+            tm.doMethodLater(.01, _testDoLaterOwner, 'testDoLaterOwner',
+                             owner=doLaterOwner)
+            doLaterTests[0] += 1
+            # make sure we run this task after the doLaters if they all occur on the same frame
+            tm.add(_monitorDoLaterOwner, 'monitorDoLaterOwner', sort=10)
+            _testDoLaterOwner = None
+            _monitorDoLaterOwner = None
+            del doLaterOwner
+            _DoLaterOwner = None
+            # don't check until all the doLaters are finished
+            #tm._checkMemLeaks()
+
+            # run the doLater tests
+            while doLaterTests[0] > 0:
+                tm.step()
+            del doLaterTests
+            tm._checkMemLeaks()
+
+            # getTasks
+            def _testGetTasks(task):
+                return task.cont
+            # No doLaterProcessor in the new world.
+            assert len(tm.getTasks()) == 0
+            tm.add(_testGetTasks, 'testGetTasks1')
+            assert len(tm.getTasks()) == 1
+            assert (tm.getTasks()[0].name == 'testGetTasks1' or
+                    tm.getTasks()[1].name == 'testGetTasks1')
+            tm.add(_testGetTasks, 'testGetTasks2')
+            tm.add(_testGetTasks, 'testGetTasks3')
+            assert len(tm.getTasks()) == 3
+            tm.remove('testGetTasks2')
+            assert len(tm.getTasks()) == 2
+            tm.remove('testGetTasks1')
+            tm.remove('testGetTasks3')
+            assert len(tm.getTasks()) == 0
+            _testGetTasks = None
+            tm._checkMemLeaks()
+
+            # getDoLaters
+            def _testGetDoLaters():
+                pass
+            assert len(tm.getDoLaters()) == 0
+            tm.doMethodLater(.1, _testGetDoLaters, 'testDoLater1')
+            assert len(tm.getDoLaters()) == 1
+            assert tm.getDoLaters()[0].name == 'testDoLater1'
+            tm.doMethodLater(.1, _testGetDoLaters, 'testDoLater2')
+            tm.doMethodLater(.1, _testGetDoLaters, 'testDoLater3')
+            assert len(tm.getDoLaters()) == 3
+            tm.remove('testDoLater2')
+            assert len(tm.getDoLaters()) == 2
+            tm.remove('testDoLater1')
+            tm.remove('testDoLater3')
+            assert len(tm.getDoLaters()) == 0
+            _testGetDoLaters = None
+            tm._checkMemLeaks()
+
+            # duplicate named doLaters removed via taskMgr.remove
+            def _testDupNameDoLaters():
+                pass
+            # the doLaterProcessor is always running
+            tm.doMethodLater(.1, _testDupNameDoLaters, 'testDupNameDoLater')
+            tm.doMethodLater(.1, _testDupNameDoLaters, 'testDupNameDoLater')
+            assert len(tm.getDoLaters()) == 2
+            tm.remove('testDupNameDoLater')
+            assert len(tm.getDoLaters()) == 0
+            _testDupNameDoLaters = None
+            tm._checkMemLeaks()
+
+            # duplicate named doLaters removed via remove()
+            def _testDupNameDoLatersRemove():
+                pass
+            # the doLaterProcessor is always running
+            dl1 = tm.doMethodLater(.1, _testDupNameDoLatersRemove, 'testDupNameDoLaterRemove')
+            dl2 = tm.doMethodLater(.1, _testDupNameDoLatersRemove, 'testDupNameDoLaterRemove')
+            assert len(tm.getDoLaters()) == 2
+            dl2.remove()
+            assert len(tm.getDoLaters()) == 1
+            dl1.remove()
+            assert len(tm.getDoLaters()) == 0
+            _testDupNameDoLatersRemove = None
+            # nameDict etc. isn't cleared out right away with task.remove()
+            tm._checkMemLeaks()
+
+            # getTasksNamed
+            def _testGetTasksNamed(task):
+                return task.cont
+            assert len(tm.getTasksNamed('testGetTasksNamed')) == 0
+            tm.add(_testGetTasksNamed, 'testGetTasksNamed')
+            assert len(tm.getTasksNamed('testGetTasksNamed')) == 1
+            assert tm.getTasksNamed('testGetTasksNamed')[0].name == 'testGetTasksNamed'
+            tm.add(_testGetTasksNamed, 'testGetTasksNamed')
+            tm.add(_testGetTasksNamed, 'testGetTasksNamed')
+            assert len(tm.getTasksNamed('testGetTasksNamed')) == 3
+            tm.remove('testGetTasksNamed')
+            assert len(tm.getTasksNamed('testGetTasksNamed')) == 0
+            _testGetTasksNamed = None
+            tm._checkMemLeaks()
+
+            # removeTasksMatching
+            def _testRemoveTasksMatching(task):
+                return task.cont
+            tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching')
+            assert len(tm.getTasksNamed('testRemoveTasksMatching')) == 1
+            tm.removeTasksMatching('testRemoveTasksMatching')
+            assert len(tm.getTasksNamed('testRemoveTasksMatching')) == 0
+            tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching1')
+            tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching2')
+            assert len(tm.getTasksNamed('testRemoveTasksMatching1')) == 1
+            assert len(tm.getTasksNamed('testRemoveTasksMatching2')) == 1
+            tm.removeTasksMatching('testRemoveTasksMatching*')
+            assert len(tm.getTasksNamed('testRemoveTasksMatching1')) == 0
+            assert len(tm.getTasksNamed('testRemoveTasksMatching2')) == 0
+            tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching1a')
+            tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching2a')
+            assert len(tm.getTasksNamed('testRemoveTasksMatching1a')) == 1
+            assert len(tm.getTasksNamed('testRemoveTasksMatching2a')) == 1
+            tm.removeTasksMatching('testRemoveTasksMatching?a')
+            assert len(tm.getTasksNamed('testRemoveTasksMatching1a')) == 0
+            assert len(tm.getTasksNamed('testRemoveTasksMatching2a')) == 0
+            _testRemoveTasksMatching = None
+            tm._checkMemLeaks()
+
+            # create Task object and add to mgr
+            l = []
+            def _testTaskObj(task, l=l):
+                l.append(None)
+                return task.cont
+            t = Task(_testTaskObj)
+            tm.add(t, 'testTaskObj')
+            tm.step()
+            assert len(l) == 1
+            tm.step()
+            assert len(l) == 2
+            tm.remove('testTaskObj')
+            tm.step()
+            assert len(l) == 2
+            _testTaskObj = None
+            tm._checkMemLeaks()
+
+            # remove Task via task.remove()
+            l = []
+            def _testTaskObjRemove(task, l=l):
+                l.append(None)
+                return task.cont
+            t = Task(_testTaskObjRemove)
+            tm.add(t, 'testTaskObjRemove')
+            tm.step()
+            assert len(l) == 1
+            tm.step()
+            assert len(l) == 2
+            t.remove()
+            tm.step()
+            assert len(l) == 2
+            del t
+            _testTaskObjRemove = None
+            tm._checkMemLeaks()
+
+            """
+            # this test fails, and it's not clear what the correct behavior should be.
+            # sort passed to Task.__init__ is always overridden by taskMgr.add()
+            # even if no sort is specified, and calling Task.setSort() has no
+            # effect on the taskMgr's behavior.
+            # set/get Task sort
+            l = []
+            def _testTaskObjSort(arg, task, l=l):
+                l.append(arg)
+                return task.cont
+            t1 = Task(_testTaskObjSort, sort=1)
+            t2 = Task(_testTaskObjSort, sort=2)
+            tm.add(t1, 'testTaskObjSort1', extraArgs=['a',], appendTask=True)
+            tm.add(t2, 'testTaskObjSort2', extraArgs=['b',], appendTask=True)
+            tm.step()
+            assert len(l) == 2
+            assert l == ['a', 'b']
+            assert t1.getSort() == 1
+            assert t2.getSort() == 2
+            t1.setSort(3)
+            assert t1.getSort() == 3
+            tm.step()
+            assert len(l) == 4
+            assert l == ['a', 'b', 'b', 'a',]
+            t1.remove()
+            t2.remove()
+            tm.step()
+            assert len(l) == 4
+            del t1
+            del t2
+            _testTaskObjSort = None
+            tm._checkMemLeaks()
+            """
+
+            del l
+            tm.destroy()
+            del tm

+ 13 - 4
direct/src/task/TaskOrig.py

@@ -631,6 +631,9 @@ class TaskManager:
             # Alert the world, a new task is born!
             messenger.send('TaskManager-spawnDoLater',
                            sentArgs = [task, task.name, task.id])
+
+        if task.owner:
+            task.owner._addTask(task)
         return task
 
     def add(self, funcOrTask, name, priority=None, sort=None, extraArgs=None, uponDeath=None,
@@ -679,6 +682,8 @@ class TaskManager:
             self.nameDict[name] = [task]
         # Put it on the list for the end of this frame
         self.__addPendingTask(task)
+        if task.owner:
+            task.owner._addTask(task)
         return task
 
     def __addPendingTask(self, task):
@@ -1618,6 +1623,8 @@ class TaskManager:
 
             # task owner
             class _TaskOwner:
+                def _addTask(self, task):
+                    self.addedTaskName = task.name
                 def _clearTask(self, task):
                     self.clearedTaskName = task.name
             to = _TaskOwner()
@@ -1626,8 +1633,8 @@ class TaskManager:
                 return done
             tm.add(_testOwner, 'testOwner', owner=to)
             tm.step()
-            assert hasattr(to, 'clearedTaskName')
-            assert to.clearedTaskName == 'testOwner'
+            assert getattr(to, 'addedTaskName', None) == 'testOwner'
+            assert getattr(to, 'clearedTaskName', None) == 'testOwner'
             _testOwner = None
             del to
             _TaskOwner = None
@@ -1748,6 +1755,8 @@ class TaskManager:
 
             # doLater owner
             class _DoLaterOwner:
+                def _addTask(self, task):
+                    self.addedTaskName = task.name
                 def _clearTask(self, task):
                     self.clearedTaskName = task.name
             doLaterOwner = _DoLaterOwner()
@@ -1757,8 +1766,8 @@ class TaskManager:
             def _monitorDoLaterOwner(task, tm=tm, l=l, doLaterOwner=doLaterOwner,
                                      doLaterTests=doLaterTests):
                 if task.time > .02:
-                    assert hasattr(doLaterOwner, 'clearedTaskName')
-                    assert doLaterOwner.clearedTaskName == 'testDoLaterOwner'
+                    assert getattr(doLaterOwner, 'addedTaskName', None) == 'testDoLaterOwner'
+                    assert getattr(doLaterOwner, 'clearedTaskName', None) == 'testDoLaterOwner'
                     doLaterTests[0] -= 1
                     return task.done
                 return task.cont

+ 11 - 0
panda/src/event/asyncTask.I

@@ -138,6 +138,17 @@ clear_name() {
   set_name(string());
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTask::get_task_id
+//       Access: Public
+//  Description: Returns a number guaranteed to be unique for each
+//               different AsyncTask object in the universe.
+////////////////////////////////////////////////////////////////////
+INLINE AtomicAdjust::Integer AsyncTask::
+get_task_id() const {
+  return _task_id;
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: AsyncTask::get_task_chain
 //       Access: Published

+ 55 - 1
panda/src/event/asyncTask.cxx

@@ -15,7 +15,11 @@
 #include "asyncTask.h"
 #include "asyncTaskManager.h"
 #include "config_event.h"
+#include "pt_Event.h"
+#include "throw_event.h"
+#include "eventParameter.h"
 
+AtomicAdjust::Integer AsyncTask::_next_task_id;
 PStatCollector AsyncTask::_show_code_pcollector("App:Show code");
 TypeHandle AsyncTask::_type_handle;
 
@@ -26,6 +30,7 @@ TypeHandle AsyncTask::_type_handle;
 ////////////////////////////////////////////////////////////////////
 AsyncTask::
 AsyncTask(const string &name) : 
+  _chain_name("default"),
   _delay(0.0),
   _has_delay(false),
   _wake_time(0.0),
@@ -44,6 +49,15 @@ AsyncTask(const string &name) :
   _python_object = NULL;
 #endif  // HAVE_PYTHON
   set_name(name);
+
+  // Carefully copy _next_task_id and increment it so that we get a
+  // unique ID.
+  AtomicAdjust::Integer current_id = _next_task_id;
+  while (AtomicAdjust::compare_and_exchange(_next_task_id, current_id, current_id + 1) != current_id) {
+    current_id = _next_task_id;
+  }
+
+  _task_id = current_id;
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -161,7 +175,7 @@ set_task_chain(const string &chain_name) {
 
         AsyncTaskChain *chain_b = manager->do_find_task_chain(_chain_name);
         if (chain_b == (AsyncTaskChain *)NULL) {
-          event_cat.warning()
+          task_cat.warning()
             << "Creating implicit AsyncTaskChain " << _chain_name
             << " for " << manager->get_type() << " "
             << manager->get_name() << "\n";
@@ -338,3 +352,43 @@ AsyncTask::DoneStatus AsyncTask::
 do_task() {
   return DS_done;
 }
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTask::upon_birth
+//       Access: Protected, Virtual
+//  Description: Override this function to do something useful when the
+//               task has been added to the active queue.
+//
+//               This function is called with the lock held.  You may
+//               temporarily release if it necessary, but be sure to
+//               return with it held.
+////////////////////////////////////////////////////////////////////
+void AsyncTask::
+upon_birth() {
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTask::upon_death
+//       Access: Protected, Virtual
+//  Description: Override this function to do something useful when the
+//               task has been removed from the active queue.  The
+//               parameter clean_exit is true if the task has been
+//               removed because it exited normally (returning
+//               DS_done), or false if it was removed for some other
+//               reason (e.g. AsyncTaskManager::remove()).
+//
+//               The normal behavior is to throw the done_event only
+//               if clean_exit is true.
+//
+//               This function is called with the lock held.  You may
+//               temporarily release if it necessary, but be sure to
+//               return with it held.
+////////////////////////////////////////////////////////////////////
+void AsyncTask::
+upon_death(bool clean_exit) {
+  if (clean_exit && !_done_event.empty()) {
+    PT_Event event = new Event(_done_event);
+    event->add_parameter(EventParameter(this));
+    throw_event(event);
+  }
+}

+ 7 - 0
panda/src/event/asyncTask.h

@@ -80,6 +80,8 @@ PUBLISHED:
   void set_name(const string &name);
   INLINE void clear_name();
 
+  INLINE AtomicAdjust::Integer get_task_id() const;
+
   void set_task_chain(const string &chain_name);
   INLINE const string &get_task_chain() const;
 
@@ -106,8 +108,11 @@ PUBLISHED:
 protected:
   DoneStatus unlock_and_do_task();
   virtual DoneStatus do_task();
+  virtual void upon_birth();
+  virtual void upon_death(bool clean_exit);
 
 protected:
+  AtomicAdjust::Integer _task_id;
   string _chain_name;
   double _delay;
   bool _has_delay;
@@ -126,6 +131,8 @@ protected:
   double _total_dt;
   int _num_frames;
 
+  static AtomicAdjust::Integer _next_task_id;
+
   static PStatCollector _show_code_pcollector;
   PStatCollector _task_pcollector;
 

+ 133 - 36
panda/src/event/asyncTaskChain.cxx

@@ -15,9 +15,6 @@
 #include "asyncTaskChain.h"
 #include "asyncTaskManager.h"
 #include "event.h"
-#include "pt_Event.h"
-#include "throw_event.h"
-#include "eventParameter.h"
 #include "mutexHolder.h"
 #include "indent.h"
 #include "pStatClient.h"
@@ -48,7 +45,7 @@ AsyncTaskChain(AsyncTaskManager *manager, const string &name) :
   _num_busy_threads(0),
   _num_tasks(0),
   _state(S_initial),
-  _current_sort(INT_MAX),
+  _current_sort(-INT_MAX),
   _needs_cleanup(false),
   _current_frame(0),
   _time_in_frame(0.0)
@@ -365,6 +362,19 @@ poll() {
   do_poll();
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::get_next_wake_time
+//       Access: Published
+//  Description: Returns the scheduled time (on the manager's clock)
+//               of the next sleeping task, on any task chain, to
+//               awaken.  Returns -1 if there are no sleeping tasks.
+////////////////////////////////////////////////////////////////////
+double AsyncTaskChain::
+get_next_wake_time() const {
+  MutexHolder holder(_manager->_lock);
+  return do_get_next_wake_time();
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: AsyncTaskChain::output
 //       Access: Published, Virtual
@@ -373,9 +383,7 @@ poll() {
 void AsyncTaskChain::
 output(ostream &out) const {
   MutexHolder holder(_manager->_lock);
-
-  out << get_type() << " " << get_name()
-      << "; " << _num_tasks << " tasks";
+  do_output(out);
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -420,6 +428,7 @@ do_add(AsyncTask *task) {
   if (task->has_delay()) {
     // This is a deferred task.  Add it to the sleeping queue.
     task->_wake_time = now + task->get_delay();
+    task->_start_time = task->_wake_time;
     task->_state = AsyncTask::S_sleeping;
     _sleeping.push_back(task);
     push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
@@ -427,7 +436,13 @@ do_add(AsyncTask *task) {
   } else {
     // This is an active task.  Add it to the active set.
     task->_state = AsyncTask::S_active;
-    if (task->get_sort() > _current_sort) {
+    if (task_cat.is_spam()) {
+      task_cat.spam()
+        << "Adding " << *task << " with sort " << task->get_sort()
+        << " to chain " << get_name() << " with current_sort "
+        << _current_sort << "\n";
+    }
+    if (task->get_sort() >= _current_sort) {
       // It will run this frame.
       _active.push_back(task);
       push_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
@@ -439,6 +454,8 @@ do_add(AsyncTask *task) {
   ++_num_tasks;
   ++(_manager->_num_tasks);
   _needs_cleanup = true;
+
+  task->upon_birth();
   _cvar.signal_all();
 }
 
@@ -541,21 +558,30 @@ void AsyncTaskChain::
 do_cleanup() {
   do_stop_threads();
 
+  // Move aside the task lists first.  We must do this before we start
+  // iterating, because cleanup_task() might release the lock,
+  // allowing the iterators to become invalid.
+
+  TaskHeap active, next_active, sleeping;
+  active.swap(_active);
+  next_active.swap(_next_active);
+  sleeping.swap(_sleeping);
+
+  _needs_cleanup = false;
+
   TaskHeap::const_iterator ti;
-  for (ti = _active.begin(); ti != _active.end(); ++ti) {
+  for (ti = active.begin(); ti != active.end(); ++ti) {
     AsyncTask *task = (*ti);
     cleanup_task(task, false);
   }
-  for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
+  for (ti = next_active.begin(); ti != next_active.end(); ++ti) {
     AsyncTask *task = (*ti);
     cleanup_task(task, false);
   }
-  for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
+  for (ti = sleeping.begin(); ti != sleeping.end(); ++ti) {
     AsyncTask *task = (*ti);
     cleanup_task(task, false);
   }
-
-  _needs_cleanup = false;
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -600,6 +626,9 @@ find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const {
 //               and restores it to the end of the queue.  This is
 //               called internally only within one of the task
 //               threads.  Assumes the lock is already held.
+//
+//               Note that the lock may be temporarily released by
+//               this method.
 ////////////////////////////////////////////////////////////////////
 void AsyncTaskChain::
 service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
@@ -612,6 +641,12 @@ service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
       thread->_servicing = task;
     }
 
+    if (task_cat.is_spam()) {
+      task_cat.spam()
+        << "Servicing " << *task << " in " << *Thread::get_current_thread()
+        << "\n";
+    }
+
     nassertv(task->get_sort() == _current_sort);
     nassertv(task->_state == AsyncTask::S_active);
     task->_state = AsyncTask::S_servicing;
@@ -650,6 +685,11 @@ service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
             task->_state = AsyncTask::S_sleeping;
             _sleeping.push_back(task);
             push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
+            if (task_cat.is_spam()) {
+              task_cat.spam()
+                << "Sleeping " << *task << ", wake time at " 
+                << task->get_wake_time() - now << "\n";
+            }
             _cvar.signal_all();
           }
           break;
@@ -678,10 +718,14 @@ service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
 //  Description: Called internally when a task has completed (or been
 //               interrupted) and is about to be removed from the
 //               active queue.  Assumes the lock is held.
+//
+//               Note that the lock may be temporarily released by
+//               this method.
 ////////////////////////////////////////////////////////////////////
 void AsyncTaskChain::
 cleanup_task(AsyncTask *task, bool clean_exit) {
   nassertv(task->_chain == this);
+  PT(AsyncTask) hold_task = task;
 
   task->_state = AsyncTask::S_inactive;
   task->_chain = NULL;
@@ -690,12 +734,7 @@ cleanup_task(AsyncTask *task, bool clean_exit) {
   --(_manager->_num_tasks);
 
   _manager->remove_task_by_name(task);
-
-  if (clean_exit && !task->_done_event.empty()) {
-    PT_Event event = new Event(task->_done_event);
-    event->add_parameter(EventParameter(task));
-    throw_event(event);
-  }
+  task->upon_death(clean_exit);
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -723,7 +762,18 @@ finish_sort_group() {
   }
 
   // There are no more tasks in this epoch; advance to the next epoch.
+  if (task_cat.is_spam()) {
+    do_output(task_cat.spam());
+    task_cat.spam(false)
+      << ": next epoch\n";
+  }
+
   if (_tick_clock) {
+    if (task_cat.is_spam()) {
+      do_output(task_cat.spam());
+      task_cat.spam(false)
+        << ": tick clock\n";
+    }
     _manager->_clock->tick();
     _manager->_frame_cvar.signal_all();
   }
@@ -737,24 +787,40 @@ finish_sort_group() {
   double now = _manager->_clock->get_frame_time();
   while (!_sleeping.empty() && _sleeping.front()->get_wake_time() <= now) {
     PT(AsyncTask) task = _sleeping.front();
+    if (task_cat.is_spam()) {
+      task_cat.spam()
+        << "Waking " << *task << ", wake time at " 
+        << task->get_wake_time() - now << "\n";
+    }
     pop_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
     _sleeping.pop_back();
     task->_state = AsyncTask::S_active;
     _active.push_back(task);
   }
 
+  if (task_cat.is_spam()) {
+    if (_sleeping.empty()) {
+      task_cat.spam()
+        << "No more tasks on sleeping queue.\n";
+    } else {
+      task_cat.spam()
+        << "Next sleeper: " << *_sleeping.front() << ", wake time at " 
+        << _sleeping.front()->get_wake_time() - now << "\n";
+    }
+  }
+
   make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
   nassertr(_num_tasks == _active.size() + _sleeping.size(), true);
 
+  _current_sort = -INT_MAX;
+
   if (!_active.empty()) {
-    // Get the first task on the queue.
-    _current_sort = _active.front()->get_sort();
+    // Signal the threads to start executing the first task again.
     _cvar.signal_all();
     return true;
   }
 
   // There are no tasks to be had anywhere.  Chill.
-  _current_sort = INT_MAX;
   return false;
 }
 
@@ -807,11 +873,7 @@ do_start_threads() {
       _threads.reserve(_num_threads);
       for (int i = 0; i < _num_threads; ++i) {
         ostringstream strm;
-        strm << _manager->get_name();
-        if (has_name()) {
-          strm << "_" << get_name();
-        }
-        strm << "_" << i;
+        strm << _manager->get_name() << "_" << get_name() << "_" << i;
         PT(AsyncTaskChainThread) thread = new AsyncTaskChainThread(strm.str(), this);
         if (thread->start(_thread_priority, true)) {
           _threads.push_back(thread);
@@ -906,6 +968,37 @@ do_poll() {
   finish_sort_group();
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::do_get_next_wake_time
+//       Access: Protected
+//  Description: 
+////////////////////////////////////////////////////////////////////
+double AsyncTaskChain::
+do_get_next_wake_time() const {
+  if (!_sleeping.empty()) {
+    PT(AsyncTask) task = _sleeping.front();
+    return task->_wake_time;
+  }
+  return -1.0;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::do_output
+//       Access: Protected
+//  Description: The private implementation of output(), this assumes
+//               the lock is already held.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+do_output(ostream &out) const {
+  if (_manager != (AsyncTaskManager *)NULL) {
+    out << _manager->get_type() << " " << _manager->get_name();
+  } else {
+    out << "(no manager)";
+  }
+  out << " task chain " << get_name()
+      << "; " << _num_tasks << " tasks";
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: AsyncTaskChain::do_write
 //       Access: Protected
@@ -914,15 +1007,19 @@ do_poll() {
 ////////////////////////////////////////////////////////////////////
 void AsyncTaskChain::
 do_write(ostream &out, int indent_level) const {
-  if (has_name()) {
-    indent(out, indent_level)
-      << get_name();
-    if (_num_threads > 0) {
-      out << ", " << _num_threads << " threads";
-    }
-    out << "\n";
-  } else if (_num_threads > 0) {
-    out << "Default task chain, " << _num_threads << " threads\n";
+  indent(out, indent_level)
+    << "Task chain \"" << get_name() << "\"\n";
+  if (_num_threads > 0) {
+    indent(out, indent_level + 2) 
+      << _num_threads << " threads, priority " << _thread_priority << "\n";
+  }
+  if (_frame_budget >= 0.0) {
+    indent(out, indent_level + 2) 
+      << "frame budget " << _frame_budget << " s\n";
+  }
+  if (_tick_clock) {
+    indent(out, indent_level + 2) 
+      << "tick clock\n";
   }
 
   static const size_t buffer_size = 1024;

+ 12 - 1
panda/src/event/asyncTaskChain.h

@@ -88,6 +88,7 @@ PUBLISHED:
   AsyncTaskCollection get_sleeping_tasks() const;
 
   void poll();
+  double get_next_wake_time() const;
 
   virtual void output(ostream &out) const;
   virtual void write(ostream &out, int indent_level = 0) const;
@@ -112,6 +113,8 @@ protected:
   AsyncTaskCollection do_get_active_tasks() const;
   AsyncTaskCollection do_get_sleeping_tasks() const;
   void do_poll();
+  double do_get_next_wake_time() const;
+  void do_output(ostream &out) const;
   void do_write(ostream &out, int indent_level) const;
 
   void write_task_line(ostream &out, int indent_level, AsyncTask *task, double now) const;
@@ -139,7 +142,10 @@ protected:
       if (a->get_sort() != b->get_sort()) {
         return a->get_sort() > b->get_sort();
       }
-      return a->get_priority() < b->get_priority();
+      if (a->get_priority() != b->get_priority()) {
+        return a->get_priority() < b->get_priority();
+      }
+      return a->get_start_time() > b->get_start_time();
     }
   };
 
@@ -198,6 +204,11 @@ private:
   friend class AsyncTaskManager;
 };
 
+INLINE ostream &operator << (ostream &out, const AsyncTaskChain &chain) {
+  chain.output(out);
+  return out;
+};
+
 #include "asyncTaskChain.I"
 
 #endif

+ 99 - 29
panda/src/event/asyncTaskManager.cxx

@@ -39,7 +39,7 @@ AsyncTaskManager(const string &name) :
   _frame_cvar(_lock)
 {
   // Make a default task chain.
-  do_make_task_chain("");
+  do_make_task_chain("default");
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -49,6 +49,17 @@ AsyncTaskManager(const string &name) :
 ////////////////////////////////////////////////////////////////////
 AsyncTaskManager::
 ~AsyncTaskManager() {
+  cleanup();
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskManager::cleanup
+//       Access: Published
+//  Description: Stops all threads and messily empties the task list.
+//               This is intended to be called on destruction only.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskManager::
+cleanup() {
   MutexHolder holder(_lock);
 
   TaskChains::iterator tci;
@@ -58,6 +69,9 @@ AsyncTaskManager::
     AsyncTaskChain *chain = (*tci);
     chain->do_cleanup();
   }
+
+  _task_chains.clear();
+  nassertv(_num_tasks == 0 && _tasks_by_name.empty());
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -134,7 +148,7 @@ remove_task_chain(const string &name) {
 
   while (chain->_num_tasks != 0) {
     // Still has tasks.
-    event_cat.info()
+    task_cat.info()
       << "Waiting for tasks on chain " << name << " to finish.\n";
     chain->do_wait_for_tasks();
   }
@@ -156,6 +170,11 @@ void AsyncTaskManager::
 add(AsyncTask *task) {
   MutexHolder holder(_lock);
 
+  if (task_cat.is_debug()) {
+    task_cat.debug()
+      << "Adding " << *task << "\n";
+  }
+  
   if (task->_state == AsyncTask::S_servicing_removed) {
     if (task->_manager == this) {
       // Re-adding a self-removed task; this just means clearing the
@@ -171,7 +190,7 @@ add(AsyncTask *task) {
 
   AsyncTaskChain *chain = do_find_task_chain(task->_chain_name);
   if (chain == (AsyncTaskChain *)NULL) {
-    event_cat.warning()
+    task_cat.warning()
       << "Creating implicit AsyncTaskChain " << task->_chain_name
       << " for " << get_type() << " " << get_name() << "\n";
     chain = do_make_task_chain(task->_chain_name);
@@ -309,8 +328,17 @@ remove(const AsyncTaskCollection &tasks) {
       nassertr(!do_has_task(task), num_removed);
     } else {
       nassertr(task->_chain->_manager == this, num_removed);
+      if (task_cat.is_debug()) {
+        task_cat.debug()
+          << "Removing " << *task << "\n";
+      }
       if (task->_chain->do_remove(task)) {
         ++num_removed;
+      } else {
+        if (task_cat.is_debug()) {
+          task_cat.debug()
+            << "  (unable to remove " << *task << ")\n";
+        }
       }
     }
   }
@@ -475,6 +503,39 @@ poll() {
   _frame_cvar.signal_all();
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskManager::get_next_wake_time
+//       Access: Published
+//  Description: Returns the scheduled time (on the manager's clock)
+//               of the next sleeping task, on any task chain, to
+//               awaken.  Returns -1 if there are no sleeping tasks.
+////////////////////////////////////////////////////////////////////
+double AsyncTaskManager::
+get_next_wake_time() const {
+  MutexHolder holder(_lock);
+
+  bool got_any = false;
+  double next_wake_time = -1.0;
+
+  TaskChains::const_iterator tci;
+  for (tci = _task_chains.begin();
+       tci != _task_chains.end();
+       ++tci) {
+    AsyncTaskChain *chain = (*tci);
+    double time = chain->do_get_next_wake_time();
+    if (time >= 0.0) {
+      if (!got_any) {
+        got_any = true;
+        next_wake_time = time;
+      } else {
+        next_wake_time = min(time, next_wake_time);
+      }
+    }
+  }
+
+  return next_wake_time;
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: AsyncTaskManager::output
 //       Access: Published, Virtual
@@ -483,9 +544,7 @@ poll() {
 void AsyncTaskManager::
 output(ostream &out) const {
   MutexHolder holder(_lock);
-
-  out << get_type() << " " << get_name()
-      << "; " << _num_tasks << " tasks";
+  do_output(out);
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -509,29 +568,6 @@ write(ostream &out, int indent_level) const {
   }
 }
 
-////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::do_has_task
-//       Access: Protected
-//  Description: Returns true if the task is on one of the task lists,
-//               false if it is not (false may mean that the task is
-//               currently being serviced).  Assumes the lock is
-//               currently held.
-////////////////////////////////////////////////////////////////////
-bool AsyncTaskManager::
-do_has_task(AsyncTask *task) const {
-  TaskChains::const_iterator tci;
-  for (tci = _task_chains.begin();
-       tci != _task_chains.end();
-       ++tci) {
-    AsyncTaskChain *chain = (*tci);
-    if (chain->do_has_task(task)) {
-      return true;
-    }
-  }
-
-  return false;
-}
-
 ////////////////////////////////////////////////////////////////////
 //     Function: AsyncTaskManager::do_make_task_chain
 //       Access: Protected
@@ -599,3 +635,37 @@ remove_task_by_name(AsyncTask *task) {
     nassertv(false);
   }
 }
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskManager::do_has_task
+//       Access: Protected
+//  Description: Returns true if the task is on one of the task lists,
+//               false if it is not (false may mean that the task is
+//               currently being serviced).  Assumes the lock is
+//               currently held.
+////////////////////////////////////////////////////////////////////
+bool AsyncTaskManager::
+do_has_task(AsyncTask *task) const {
+  TaskChains::const_iterator tci;
+  for (tci = _task_chains.begin();
+       tci != _task_chains.end();
+       ++tci) {
+    AsyncTaskChain *chain = (*tci);
+    if (chain->do_has_task(task)) {
+      return true;
+    }
+  }
+
+  return false;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskManager::do_output
+//       Access: Protected, Virtual
+//  Description: 
+////////////////////////////////////////////////////////////////////
+void AsyncTaskManager::
+do_output(ostream &out) const {
+  out << get_type() << " " << get_name()
+      << "; " << _num_tasks << " tasks";
+}

+ 10 - 0
panda/src/event/asyncTaskManager.h

@@ -58,6 +58,8 @@ PUBLISHED:
   AsyncTaskManager(const string &name);
   BLOCKING virtual ~AsyncTaskManager();
 
+  BLOCKING void cleanup();
+
   INLINE void set_clock(ClockObject *clock);
   INLINE ClockObject *get_clock();
 
@@ -88,6 +90,7 @@ PUBLISHED:
   AsyncTaskCollection get_sleeping_tasks() const;
 
   void poll();
+  double get_next_wake_time() const;
 
   virtual void output(ostream &out) const;
   virtual void write(ostream &out, int indent_level = 0) const;
@@ -101,6 +104,8 @@ protected:
 
   bool do_has_task(AsyncTask *task) const;
 
+  virtual void do_output(ostream &out) const;
+
 protected:
   class AsyncTaskSortName {
   public:
@@ -146,6 +151,11 @@ private:
   friend class AsyncTask;
 };
 
+INLINE ostream &operator << (ostream &out, const AsyncTaskManager &manager) {
+  manager.output(out);
+  return out;
+};
+
 #include "asyncTaskManager.I"
 
 #endif

+ 1 - 0
panda/src/event/config_event.cxx

@@ -27,6 +27,7 @@
 
 Configure(config_event);
 NotifyCategoryDef(event, "");
+NotifyCategoryDef(task, "");
 
 ConfigureFn(config_event) {
   AsyncTask::init_type();

+ 1 - 0
panda/src/event/config_event.h

@@ -20,5 +20,6 @@
 #include "notifyCategoryProxy.h"
 
 NotifyCategoryDecl(event, EXPCL_PANDA_EVENT, EXPTP_PANDA_EVENT);
+NotifyCategoryDecl(task, EXPCL_PANDA_EVENT, EXPTP_PANDA_EVENT);
 
 #endif

+ 183 - 2
panda/src/event/pythonTask.cxx

@@ -35,9 +35,13 @@ PythonTask(PyObject *function, const string &name) :
 {
   _function = NULL;
   _args = NULL;
+  _upon_death = NULL;
+  _owner = NULL;
 
   set_function(function);
   set_args(Py_None, true);
+  set_upon_death(Py_None);
+  set_owner(Py_None);
 
   _dict = PyDict_New();
 
@@ -154,6 +158,74 @@ get_args() {
   }
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: PythonTask::set_upon_death
+//       Access: Published
+//  Description: Replaces the function that is called when the task
+//               finishes.  The parameter should be a Python callable
+//               object.
+////////////////////////////////////////////////////////////////////
+void PythonTask::
+set_upon_death(PyObject *upon_death) {
+  Py_XDECREF(_upon_death);
+
+  _upon_death = upon_death;
+  Py_INCREF(_upon_death);
+  if (_upon_death != Py_None && !PyCallable_Check(_upon_death)) {
+    nassert_raise("Invalid upon_death function passed to PythonTask");
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: PythonTask::get_upon_death
+//       Access: Published
+//  Description: Returns the function that is called when the task
+//               finishes.
+////////////////////////////////////////////////////////////////////
+PyObject *PythonTask::
+get_upon_death() {
+  Py_INCREF(_upon_death);
+  return _upon_death;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: PythonTask::set_owner
+//       Access: Published
+//  Description: Specifies a Python object that serves as the "owner"
+//               for the task.  This owner object must have two
+//               methods: _addTask() and _clearTask(), which will be
+//               called with one parameter, the task object.
+//
+//               owner._addTask() is called when the task is added
+//               into the active task list, and owner._clearTask() is
+//               called when it is removed.
+////////////////////////////////////////////////////////////////////
+void PythonTask::
+set_owner(PyObject *owner) {
+  if (_owner != NULL && _owner != Py_None && _state != S_inactive) {
+    upon_death(false);
+  }
+
+  Py_XDECREF(_owner);
+  _owner = owner;
+  Py_INCREF(_owner);
+
+  if (_owner != Py_None && _state != S_inactive) {
+    upon_birth();
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: PythonTask::get_owner
+//       Access: Published
+//  Description: Returns the "owner" object.  See set_owner().
+////////////////////////////////////////////////////////////////////
+PyObject *PythonTask::
+get_owner() {
+  Py_INCREF(_owner);
+  return _owner;
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: PythonTask::__setattr__
 //       Access: Published
@@ -165,9 +237,29 @@ get_args() {
 ////////////////////////////////////////////////////////////////////
 int PythonTask::
 __setattr__(const string &attr_name, PyObject *v) {
+  if (task_cat.is_debug()) {
+    PyObject *str = PyObject_Repr(v);
+    task_cat.debug() 
+      << *this << ": task." << attr_name << " = "
+      << PyString_AsString(str) << "\n";
+    Py_DECREF(str);
+  }
+
   if (attr_name == "delayTime") {
     double delay = PyFloat_AsDouble(v);
-    set_delay(delay);
+    if (!PyErr_Occurred()) {
+      set_delay(delay);
+    }
+
+  } else if (attr_name == "name") {
+    char *name = PyString_AsString(v);
+    if (name != (char *)NULL) {
+      set_name(name);
+    }
+
+  } else if (attr_name == "id") {
+    nassert_raise("Cannot set constant value");
+    return true;
 
   } else {
     return PyDict_SetItemString(_dict, attr_name.c_str(), v);
@@ -209,6 +301,10 @@ __getattr__(const string &attr_name) const {
     return PyInt_FromLong(DS_cont);
   } else if (attr_name == "again") {
     return PyInt_FromLong(DS_again);
+  } else if (attr_name == "name") {
+    return PyString_FromString(get_name().c_str());
+  } else if (attr_name == "id") {
+    return PyInt_FromLong(_task_id);
   } else {
     return PyMapping_GetItemString(_dict, (char *)attr_name.c_str());
   }
@@ -227,7 +323,7 @@ do_task() {
   Py_DECREF(args);
 
   if (result == (PyObject *)NULL) {
-    event_cat.error()
+    task_cat.error()
       << "Exception occurred in " << *this << "\n";
     return DS_abort;
   }
@@ -270,4 +366,89 @@ do_task() {
   return DS_abort;
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: PythonTask::upon_birth
+//       Access: Protected, Virtual
+//  Description: Override this function to do something useful when the
+//               task has been added to the active queue.
+//
+//               This function is called with the lock held.  You may
+//               temporarily release if it necessary, but be sure to
+//               return with it held.
+////////////////////////////////////////////////////////////////////
+void PythonTask::
+upon_birth() {
+  AsyncTask::upon_birth();
+  call_owner_method("_addTask");
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: PythonTask::upon_death
+//       Access: Protected, Virtual
+//  Description: Override this function to do something useful when the
+//               task has been removed from the active queue.  The
+//               parameter clean_exit is true if the task has been
+//               removed because it exited normally (returning
+//               DS_done), or false if it was removed for some other
+//               reason (e.g. AsyncTaskManager::remove()).
+//
+//               The normal behavior is to throw the done_event only
+//               if clean_exit is true.
+//
+//               This function is called with the lock held.  You may
+//               temporarily release if it necessary, but be sure to
+//               return with it held.
+////////////////////////////////////////////////////////////////////
+void PythonTask::
+upon_death(bool clean_exit) {
+  AsyncTask::upon_death(clean_exit);
+  call_owner_method("_clearTask");
+  call_function(_upon_death);
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: PythonTask::call_owner_method
+//       Access: Protected
+//  Description: Calls the indicated method name on the given object,
+//               if defined, passing in the task object as the only
+//               parameter.
+////////////////////////////////////////////////////////////////////
+void PythonTask::
+call_owner_method(const char *method_name) {
+  if (_owner != Py_None) {
+    PyObject *func = PyObject_GetAttrString(_owner, (char *)method_name);
+    if (func == (PyObject *)NULL) {
+      PyObject *str = PyObject_Repr(_owner);
+      task_cat.error() 
+        << "Owner object " << PyString_AsString(str) << " added to "
+        << *this << " has no method " << method_name << "().\n";
+      Py_DECREF(str);
+
+    } else {
+      call_function(func);
+      Py_DECREF(func);
+    }
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: PythonTask::call_function
+//       Access: Protected
+//  Description: Calls the indicated Python function, passing in the
+//               task object as the only parameter.
+////////////////////////////////////////////////////////////////////
+void PythonTask::
+call_function(PyObject *function) {
+  if (function != Py_None) {
+    PyObject *self = 
+      DTool_CreatePyInstanceTyped(this, Dtool_TypedReferenceCount,
+                                  true, false, get_type_index());
+    PyObject *args = PyTuple_Pack(1, self);
+    
+    PyObject *result = PyObject_CallObject(function, args);
+    Py_XDECREF(result);
+    Py_DECREF(args);
+  }
+}
+
 #endif  // HAVE_PYTHON

+ 13 - 0
panda/src/event/pythonTask.h

@@ -37,17 +37,30 @@ PUBLISHED:
   void set_args(PyObject *args, bool append_task);
   PyObject *get_args();
 
+  void set_upon_death(PyObject *upon_death);
+  PyObject *get_upon_death();
+
+  void set_owner(PyObject *owner);
+  PyObject *get_owner();
+
   int __setattr__(const string &attr_name, PyObject *v);
   int __setattr__(const string &attr_name);
   PyObject *__getattr__(const string &attr_name) const;
 
 protected:
   virtual DoneStatus do_task();
+  virtual void upon_birth();
+  virtual void upon_death(bool clean_exit);
+
+  void call_owner_method(const char *method_name);
+  void call_function(PyObject *function);
 
 private:
   PyObject *_function;
   PyObject *_args;
   bool _append_task;
+  PyObject *_upon_death;
+  PyObject *_owner;
   PyObject *_dict;
 
 public:

+ 1 - 1
panda/src/event/test_task.cxx

@@ -50,7 +50,7 @@ static const int num_threads = 10;
 int
 main(int argc, char *argv[]) {
   PT(AsyncTaskManager) task_mgr = new AsyncTaskManager("task_mgr");
-  PT(AsyncTaskChain) chain = task_mgr->make_task_chain("");
+  PT(AsyncTaskChain) chain = task_mgr->make_task_chain("default");
   chain->set_tick_clock(true);
   chain->set_num_threads(num_threads);
 

+ 2 - 1
panda/src/pipeline/Sources.pp

@@ -107,7 +107,8 @@
     threadPosixImpl.cxx \
     threadSimpleImpl.cxx \
     threadSimpleManager.cxx \
-    threadWin32Impl.cxx
+    threadWin32Impl.cxx \
+    threadPriority.cxx
 
   #define INSTALL_HEADERS  \
     contextSwitch.h \

+ 1 - 0
panda/src/pipeline/pipeline_composite2.cxx

@@ -21,3 +21,4 @@
 #include "threadSimpleImpl.cxx"
 #include "threadSimpleManager.cxx"
 #include "threadWin32Impl.cxx"
+#include "threadPriority.cxx"

+ 37 - 0
panda/src/pipeline/threadPriority.cxx

@@ -0,0 +1,37 @@
+// Filename: threadPriority.cxx
+// Created by:  drose (26Sep08)
+//
+////////////////////////////////////////////////////////////////////
+//
+// PANDA 3D SOFTWARE
+// Copyright (c) Carnegie Mellon University.  All rights reserved.
+//
+// All use of this software is subject to the terms of the revised BSD
+// license.  You should have received a copy of this license along
+// with this source code in a file named "LICENSE."
+//
+////////////////////////////////////////////////////////////////////
+
+#include "threadPriority.h"
+
+ostream &
+operator << (ostream &out, ThreadPriority pri) {
+  switch (pri) {
+  case TP_low:
+    return out << "low";
+
+  case TP_normal:
+    return out << "normal";
+
+  case TP_high:
+    return out << "high";
+
+  case TP_urgent:
+    return out << "urgent";
+  }
+
+  pipeline_cat->error()
+    << "Invalid ThreadPriority value: " << (int)pri << "\n";
+  nassertr(false, out);
+  return out;
+}

+ 3 - 0
panda/src/pipeline/threadPriority.h

@@ -30,5 +30,8 @@ enum ThreadPriority {
 };
 END_PUBLISH
 
+EXPCL_PANDA_PIPELINE ostream &
+operator << (ostream &out, ThreadPriority pri);
+
 
 #endif