Browse Source

task chains

David Rose 17 years ago
parent
commit
bf86abd1c5

+ 3 - 0
panda/src/event/Sources.pp

@@ -9,6 +9,7 @@
   
   #define SOURCES \
     asyncTask.h asyncTask.I \
+    asyncTaskChain.h asyncTaskChain.I \
     asyncTaskCollection.h asyncTaskCollection.I \
     asyncTaskManager.h asyncTaskManager.I \
     config_event.h \
@@ -24,6 +25,7 @@
     
   #define INCLUDED_SOURCES \
     asyncTask.cxx \
+    asyncTaskChain.cxx \
     asyncTaskCollection.cxx \
     asyncTaskManager.cxx \
     buttonEvent.cxx \
@@ -37,6 +39,7 @@
 
   #define INSTALL_HEADERS \
     asyncTask.h asyncTask.I \
+    asyncTaskChain.h asyncTaskChain.I \
     asyncTaskCollection.h asyncTaskCollection.I \
     asyncTaskManager.h asyncTaskManager.I \
     buttonEvent.I buttonEvent.h \

+ 15 - 1
panda/src/event/asyncTask.I

@@ -28,7 +28,8 @@ AsyncTask(const string &name) :
   _priority(0),
   _state(S_inactive),
   _servicing_thread(NULL),
-  _manager(NULL)
+  _manager(NULL),
+  _chain(NULL)
 {
 #ifdef HAVE_PYTHON
   _python_object = NULL;
@@ -160,6 +161,19 @@ clear_name() {
   set_name(string());
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTask::get_task_chain
+//       Access: Published
+//  Description: Returns the AsyncTaskChain on which this task will
+//               be running.  Each task chain runs tasks independently
+//               of the others.
+////////////////////////////////////////////////////////////////////
+INLINE const string &AsyncTask::
+get_task_chain() const {
+  return _chain_name;
+}
+
+
 ////////////////////////////////////////////////////////////////////
 //     Function: AsyncTask::get_sort
 //       Access: Published

+ 51 - 4
panda/src/event/asyncTask.cxx

@@ -14,6 +14,7 @@
 
 #include "asyncTask.h"
 #include "asyncTaskManager.h"
+#include "config_event.h"
 
 TypeHandle AsyncTask::_type_handle;
 
@@ -24,7 +25,7 @@ TypeHandle AsyncTask::_type_handle;
 ////////////////////////////////////////////////////////////////////
 AsyncTask::
 ~AsyncTask() {
-  nassertv(_state == S_inactive && _manager == NULL);
+  nassertv(_state == S_inactive && _manager == NULL && _chain == NULL);
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -61,7 +62,7 @@ get_elapsed_time() const {
 
 ////////////////////////////////////////////////////////////////////
 //     Function: AsyncTask::set_name
-//       Access: Public
+//       Access: Published
 //  Description:
 ////////////////////////////////////////////////////////////////////
 void AsyncTask::
@@ -83,6 +84,52 @@ set_name(const string &name) {
   }
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTask::set_task_chain
+//       Access: Published
+//  Description: Specifies the AsyncTaskChain on which this task will
+//               be running.  Each task chain runs tasks independently
+//               of the others.
+////////////////////////////////////////////////////////////////////
+void AsyncTask::
+set_task_chain(const string &chain_name) {
+  if (chain_name != _chain_name) {
+    if (_manager != (AsyncTaskManager *)NULL) {
+      MutexHolder holder(_manager->_lock);
+      if (_state == S_active) {
+        // Changing chains on an "active" (i.e. enqueued) task means
+        // removing it and re-inserting it into the queue.
+        PT(AsyncTask) hold_task = this;
+        PT(AsyncTaskManager) manager = _manager;
+
+        AsyncTaskChain *chain_a = manager->do_find_task_chain(_chain_name);
+        nassertv(chain_a != (AsyncTaskChain *)NULL);
+        chain_a->do_remove(this);
+        _chain_name = chain_name;
+
+        AsyncTaskChain *chain_b = manager->do_find_task_chain(_chain_name);
+        if (chain_b == (AsyncTaskChain *)NULL) {
+          event_cat.warning()
+            << "Creating implicit AsyncTaskChain " << _chain_name
+            << " for " << manager->get_type() << " "
+            << manager->get_name() << "\n";
+          chain_b = manager->do_make_task_chain(_chain_name);
+        }
+        chain_b->do_add(this);
+
+      } else {
+        // If it's sleeping, currently being serviced, or something
+        // else, we can just change the chain_name value directly.
+        _chain_name = chain_name;
+      }
+    } else {
+      // If it hasn't been started anywhere, we can just change the
+      // chain_name value.
+      _chain_name = chain_name;
+    }
+  }
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: AsyncTask::set_sort
 //       Access: Published
@@ -104,7 +151,7 @@ set_sort(int sort) {
   if (sort != _sort) {
     if (_manager != (AsyncTaskManager *)NULL) {
       MutexHolder holder(_manager->_lock);
-      if (_state == S_active && _sort >= _manager->_current_sort) {
+      if (_state == S_active && _sort >= _chain->_current_sort) {
         // Changing sort on an "active" (i.e. enqueued) task means
         // removing it and re-inserting it into the queue.
         PT(AsyncTask) hold_task = this;
@@ -143,7 +190,7 @@ set_priority(int priority) {
   if (priority != _priority) {
     if (_manager != (AsyncTaskManager *)NULL) {
       MutexHolder holder(_manager->_lock);
-      if (_state == S_active && _sort >= _manager->_current_sort) {
+      if (_state == S_active && _sort >= _chain->_current_sort) {
         // Changing priority on an "active" (i.e. enqueued) task means
         // removing it and re-inserting it into the queue.
         PT(AsyncTask) hold_task = this;

+ 7 - 0
panda/src/event/asyncTask.h

@@ -31,6 +31,7 @@
 #endif  // HAVE_PYTHON
 
 class AsyncTaskManager;
+class AsyncTaskChain;
 
 ////////////////////////////////////////////////////////////////////
 //       Class : AsyncTask
@@ -78,6 +79,9 @@ PUBLISHED:
   void set_name(const string &name);
   INLINE void clear_name();
 
+  void set_task_chain(const string &chain_name);
+  INLINE const string &get_task_chain() const;
+
   void set_sort(int sort);
   INLINE int get_sort() const;
 
@@ -98,6 +102,7 @@ protected:
   virtual DoneStatus do_task();
 
 protected:
+  string _chain_name;
   double _delay;
   bool _has_delay;
   double _wake_time;
@@ -108,6 +113,7 @@ protected:
   State _state;
   Thread *_servicing_thread;
   AsyncTaskManager *_manager;
+  AsyncTaskChain *_chain;
 
 private:
 #ifdef HAVE_PYTHON
@@ -132,6 +138,7 @@ private:
   static TypeHandle _type_handle;
 
   friend class AsyncTaskManager;
+  friend class AsyncTaskChain;
 };
 
 INLINE ostream &operator << (ostream &out, const AsyncTask &task) {

+ 49 - 0
panda/src/event/asyncTaskChain.I

@@ -0,0 +1,49 @@
+// Filename: asyncTaskChain.I
+// Created by:  drose (23Aug06)
+//
+////////////////////////////////////////////////////////////////////
+//
+// PANDA 3D SOFTWARE
+// Copyright (c) Carnegie Mellon University.  All rights reserved.
+//
+// All use of this software is subject to the terms of the revised BSD
+// license.  You should have received a copy of this license along
+// with this source code in a file named "LICENSE."
+//
+////////////////////////////////////////////////////////////////////
+
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::is_started
+//       Access: Published
+//  Description: Returns true if the thread(s) have been started and
+//               are ready to service requests, false otherwise.  If
+//               this is false, the next call to add() or add_and_do()
+//               will automatically start the threads.
+////////////////////////////////////////////////////////////////////
+INLINE bool AsyncTaskChain::
+is_started() const {
+  return (_state == S_started);
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::set_tick_clock
+//       Access: Published
+//  Description: Sets the tick_clock flag.  When this is true,
+//               get_clock()->tick() will be called automatically at
+//               each task epoch.  This is false by default.
+////////////////////////////////////////////////////////////////////
+INLINE void AsyncTaskChain::
+set_tick_clock(bool clock) {
+  _tick_clock = clock;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::get_tick_clock
+//       Access: Published
+//  Description: Returns the tick_clock flag..  See set_tick_clock().
+////////////////////////////////////////////////////////////////////
+INLINE bool AsyncTaskChain::
+get_tick_clock() const {
+  return _tick_clock;
+}

+ 929 - 0
panda/src/event/asyncTaskChain.cxx

@@ -0,0 +1,929 @@
+// Filename: asyncTaskChain.cxx
+// Created by:  drose (23Aug06)
+//
+////////////////////////////////////////////////////////////////////
+//
+// PANDA 3D SOFTWARE
+// Copyright (c) Carnegie Mellon University.  All rights reserved.
+//
+// All use of this software is subject to the terms of the revised BSD
+// license.  You should have received a copy of this license along
+// with this source code in a file named "LICENSE."
+//
+////////////////////////////////////////////////////////////////////
+
+#include "asyncTaskChain.h"
+#include "asyncTaskManager.h"
+#include "event.h"
+#include "pt_Event.h"
+#include "throw_event.h"
+#include "eventParameter.h"
+#include "mutexHolder.h"
+#include "indent.h"
+#include "pStatClient.h"
+#include "pStatTimer.h"
+#include "clockObject.h"
+#include <algorithm>
+
+TypeHandle AsyncTaskChain::_type_handle;
+
+PStatCollector AsyncTaskChain::_task_pcollector("Task");
+PStatCollector AsyncTaskChain::_wait_pcollector("Wait");
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::Constructor
+//       Access: Published
+//  Description:
+////////////////////////////////////////////////////////////////////
+AsyncTaskChain::
+AsyncTaskChain(AsyncTaskManager *manager, const string &name) :
+  Namable(name),
+  _manager(manager),
+  _tick_clock(false),
+  _num_threads(0),
+  _cvar(manager->_lock),
+  _num_tasks(0),
+  _num_busy_threads(0),
+  _state(S_initial),
+  _current_sort(INT_MAX),
+  _needs_cleanup(false)
+{
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::Destructor
+//       Access: Published, Virtual
+//  Description:
+////////////////////////////////////////////////////////////////////
+AsyncTaskChain::
+~AsyncTaskChain() {
+  // We only grab the lock if _needs_cleanup is true.  This way, the
+  // temporary AsyncTaskChain objects created (and destructed) within
+  // the task manager won't risk a double-lock.
+  if (_needs_cleanup) {
+    MutexHolder holder(_manager->_lock);
+    do_cleanup();
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::set_num_threads
+//       Access: Published
+//  Description: Changes the number of threads for this task chain.
+//               This may require stopping the threads if they are
+//               already running.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+set_num_threads(int num_threads) {
+  nassertv(num_threads >= 0);
+
+  if (!Thread::is_threading_supported()) {
+    num_threads = 0;
+  }
+
+  MutexHolder holder(_manager->_lock);
+  if (_num_threads != num_threads) {
+    do_stop_threads();
+    _num_threads = num_threads;
+
+    if (_num_tasks != 0) {
+      do_start_threads();
+    }
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::get_num_threads
+//       Access: Published
+//  Description: Returns the number of threads that will be servicing
+//               tasks for this chain.  Also see
+//               get_num_running_threads().
+////////////////////////////////////////////////////////////////////
+int AsyncTaskChain::
+get_num_threads() const {
+  MutexHolder holder(_manager->_lock);
+  return _num_threads;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::get_num_running_threads
+//       Access: Published
+//  Description: Returns the number of threads that have been created
+//               and are actively running.  This will return 0 before
+//               the threads have been started; it will also return 0
+//               if thread support is not available.
+////////////////////////////////////////////////////////////////////
+int AsyncTaskChain::
+get_num_running_threads() const {
+  MutexHolder holder(_manager->_lock);
+  return _threads.size();
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::stop_threads
+//       Access: Published
+//  Description: Stops any threads that are currently running.  If any
+//               tasks are still pending and have not yet been picked
+//               up by a thread, they will not be serviced unless
+//               poll() or start_threads() is later called.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+stop_threads() {
+  if (_state == S_started || _state == S_aborting) {
+    // Clean up all of the threads.
+    MutexHolder holder(_manager->_lock);
+    do_stop_threads();
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::start_threads
+//       Access: Published
+//  Description: Starts any requested threads to service the tasks on
+//               the queue.  This is normally not necessary, since
+//               adding a task will start the threads automatically.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+start_threads() {
+  if (_state == S_initial || _state == S_aborting) {
+    MutexHolder holder(_manager->_lock);
+    do_start_threads();
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::has_task
+//       Access: Published
+//  Description: Returns true if the indicated task has been added to
+//               this AsyncTaskChain, false otherwise.
+////////////////////////////////////////////////////////////////////
+bool AsyncTaskChain::
+has_task(AsyncTask *task) const {
+  MutexHolder holder(_manager->_lock);
+
+  if (task->_chain != this) {
+    nassertr(!do_has_task(task), false);
+    return false;
+  }
+
+  if (task->_state == AsyncTask::S_servicing_removed) {
+    return false;
+  }
+
+  // The task might not actually be in the active queue, since it
+  // might be being serviced right now.  That's OK.
+  return true;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::wait_for_tasks
+//       Access: Published
+//  Description: Blocks until the task list is empty.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+wait_for_tasks() {
+  MutexHolder holder(_manager->_lock);
+  do_wait_for_tasks();
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::get_num_tasks
+//       Access: Published
+//  Description: Returns the number of tasks that are currently active
+//               or sleeping within the task chain.
+////////////////////////////////////////////////////////////////////
+int AsyncTaskChain::
+get_num_tasks() const {
+  MutexHolder holder(_manager->_lock);
+  return _num_tasks;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::get_tasks
+//       Access: Published
+//  Description: Returns the set of tasks that are active or sleeping
+//               on the task chain, at the time of the call.
+////////////////////////////////////////////////////////////////////
+AsyncTaskCollection AsyncTaskChain::
+get_tasks() const {
+  MutexHolder holder(_manager->_lock);
+  AsyncTaskCollection result = do_get_active_tasks();
+  result.add_tasks_from(do_get_sleeping_tasks());
+  return result;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::get_active_tasks
+//       Access: Published
+//  Description: Returns the set of tasks that are active (and not
+//               sleeping) on the task chain, at the time of the
+//               call.
+////////////////////////////////////////////////////////////////////
+AsyncTaskCollection AsyncTaskChain::
+get_active_tasks() const {
+  MutexHolder holder(_manager->_lock);
+  return do_get_active_tasks();
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::get_sleeping_tasks
+//       Access: Published
+//  Description: Returns the set of tasks that are sleeping (and not
+//               active) on the task chain, at the time of the
+//               call.
+////////////////////////////////////////////////////////////////////
+AsyncTaskCollection AsyncTaskChain::
+get_sleeping_tasks() const {
+  MutexHolder holder(_manager->_lock);
+  return do_get_sleeping_tasks();
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::poll
+//       Access: Published
+//  Description: Runs through all the tasks in the task list, once, if
+//               the task chain is running in single-threaded mode
+//               (no threads available).  This method does nothing in
+//               threaded mode, so it may safely be called in either
+//               case.
+//
+//               Normally, you would not call this function directly;
+//               instead, call AsyncTaskManager::poll(), which polls
+//               all of the task chains in sequence.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+poll() {
+  MutexHolder holder(_manager->_lock);
+  do_poll();
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::output
+//       Access: Published, Virtual
+//  Description: 
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+output(ostream &out) const {
+  MutexHolder holder(_manager->_lock);
+
+  out << get_type() << " " << get_name()
+      << "; " << _num_tasks << " tasks";
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::write
+//       Access: Published, Virtual
+//  Description: 
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+write(ostream &out, int indent_level) const {
+  MutexHolder holder(_manager->_lock);
+  do_write(out, indent_level);
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::do_add
+//       Access: Protected
+//  Description: Adds the indicated task to the active queue.  It is
+//               an error if the task is already added to this or any
+//               other active queue.
+//
+//               This is normally called only by the AsyncTaskManager.
+//               Assumes the lock is already held.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+do_add(AsyncTask *task) {
+  nassertv(task->_chain == NULL &&
+           task->_manager == NULL &&
+           task->_chain_name == get_name() &&
+           task->_state == AsyncTask::S_inactive);
+  nassertv(!do_has_task(task));
+
+  do_start_threads();
+
+  task->_chain = this;
+  task->_manager = _manager;
+
+  double now = _manager->_clock->get_frame_time();
+  task->_start_time = now;
+
+  if (task->has_delay()) {
+    // This is a deferred task.  Add it to the sleeping queue.
+    task->_wake_time = now + task->get_delay();
+    task->_state = AsyncTask::S_sleeping;
+    _sleeping.push_back(task);
+    push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
+
+  } else {
+    // This is an active task.  Add it to the active set.
+    task->_state = AsyncTask::S_active;
+    if (task->get_sort() > _current_sort) {
+      // It will run this frame.
+      _active.push_back(task);
+      push_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
+    } else {
+      // It will run next frame.
+      _next_active.push_back(task);
+    }
+  }
+  ++_num_tasks;
+  ++(_manager->_num_tasks);
+  _needs_cleanup = true;
+  _cvar.signal_all();
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::do_remove
+//       Access: Protected
+//  Description: Removes the indicated task from this chain.  Returns
+//               true if removed, false otherwise.  Assumes the lock
+//               is already held.
+////////////////////////////////////////////////////////////////////
+bool AsyncTaskChain::
+do_remove(AsyncTask *task) {
+  bool removed = false;
+
+  nassertr(task->_chain == this, false);
+
+  switch (task->_state) {
+  case AsyncTask::S_servicing:
+    // This task is being serviced.
+    task->_state = AsyncTask::S_servicing_removed;
+    removed = true;
+    break;
+    
+  case AsyncTask::S_servicing_removed:
+    // Being serviced, though it will be removed later.
+    break;
+    
+  case AsyncTask::S_sleeping:
+    // Sleeping, easy.
+    {
+      int index = find_task_on_heap(_sleeping, task);
+      nassertr(index != -1, false);
+      _sleeping.erase(_sleeping.begin() + index);
+      make_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
+      removed = true;
+      cleanup_task(task, false);
+    }
+    break;
+    
+  case AsyncTask::S_active:
+    {
+      // Active, but not being serviced, easy.
+      int index = find_task_on_heap(_active, task);
+      if (index != -1) {
+        _active.erase(_active.begin() + index);
+        make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
+      } else {
+        index = find_task_on_heap(_next_active, task);
+        nassertr(index != -1, false);
+        _next_active.erase(_next_active.begin() + index);
+      }
+      removed = true;
+      cleanup_task(task, false);
+    }
+  }
+
+  return removed;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::do_wait_for_tasks
+//       Access: Protected
+//  Description: Blocks until the task list is empty.  Assumes the
+//               lock is held.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+do_wait_for_tasks() {
+  do_start_threads();
+
+  if (_threads.empty()) {
+    // Non-threaded case.
+    while (_num_tasks > 0) {
+      if (_state == S_shutdown || _state == S_aborting) {
+        return;
+      }
+      do_poll();
+    }
+
+  } else {
+    // Threaded case.
+    while (_num_tasks > 0) {
+      if (_state == S_shutdown || _state == S_aborting) {
+        return;
+      }
+      
+      PStatTimer timer(_wait_pcollector);
+      _cvar.wait();
+    }
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::do_cleanup
+//       Access: Protected
+//  Description: Stops all threads and messily empties the task list.
+//               This is intended to be called on destruction only.
+//               Assumes the lock is already held.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+do_cleanup() {
+  do_stop_threads();
+
+  TaskHeap::const_iterator ti;
+  for (ti = _active.begin(); ti != _active.end(); ++ti) {
+    AsyncTask *task = (*ti);
+    cleanup_task(task, false);
+  }
+  for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
+    AsyncTask *task = (*ti);
+    cleanup_task(task, false);
+  }
+  for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
+    AsyncTask *task = (*ti);
+    cleanup_task(task, false);
+  }
+
+  _needs_cleanup = false;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::do_has_task
+//       Access: Protected
+//  Description: Returns true if the task is on one of the task lists,
+//               false if it is not (false may mean that the task is
+//               currently being serviced).  Assumes the lock is
+//               currently held.
+////////////////////////////////////////////////////////////////////
+bool AsyncTaskChain::
+do_has_task(AsyncTask *task) const {
+  return (find_task_on_heap(_active, task) != -1 ||
+          find_task_on_heap(_next_active, task) != -1 ||
+          find_task_on_heap(_sleeping, task) != -1);
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::find_task_on_heap
+//       Access: Protected
+//  Description: Returns the index number of the indicated task within
+//               the specified task list, or -1 if the task is not
+//               found in the list (this may mean that it is currently
+//               being serviced).  Assumes that the lock is currently
+//               held.
+////////////////////////////////////////////////////////////////////
+int AsyncTaskChain::
+find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const {
+  for (int i = 0; i < (int)heap.size(); ++i) {
+    if (heap[i] == task) {
+      return i;
+    }
+  }
+
+  return -1;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::service_one_task
+//       Access: Protected
+//  Description: Pops a single task off the active queue, services it,
+//               and restores it to the end of the queue.  This is
+//               called internally only within one of the task
+//               threads.  Assumes the lock is already held.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
+  if (!_active.empty()) {
+    PT(AsyncTask) task = _active.front();
+    pop_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
+    _active.pop_back();
+
+    if (thread != (AsyncTaskChain::AsyncTaskChainThread *)NULL) {
+      thread->_servicing = task;
+    }
+
+    nassertv(task->get_sort() == _current_sort);
+    nassertv(task->_state == AsyncTask::S_active);
+    task->_state = AsyncTask::S_servicing;
+    task->_servicing_thread = thread;
+
+    // Now release the chain lock while we actually service the
+    // task.
+    _manager->_lock.release();
+    AsyncTask::DoneStatus ds = task->do_task();
+
+    // Now we have to re-acquire the chain lock, so we can put the
+    // task back on the queue (and so we can return with the lock
+    // still held).
+    _manager->_lock.lock();
+
+    if (thread != (AsyncTaskChain::AsyncTaskChainThread *)NULL) {
+      thread->_servicing = NULL;
+    }
+    task->_servicing_thread = NULL;
+
+    if (task->_chain == this) {
+      // TODO: check task->_chain_name to see if the task wants to
+      // jump chains.
+
+      if (task->_state == AsyncTask::S_servicing_removed) {
+        // This task wants to kill itself.
+        cleanup_task(task, false);
+
+      } else {
+        switch (ds) {
+        case AsyncTask::DS_cont:
+          // The task is still alive; put it on the next frame's active
+          // queue.
+          task->_state = AsyncTask::S_active;
+          _next_active.push_back(task);
+          _cvar.signal_all();
+          break;
+          
+        case AsyncTask::DS_again:
+          // The task wants to sleep again.
+          {
+            double now = _manager->_clock->get_frame_time();
+            task->_wake_time = now + task->get_delay();
+            task->_state = AsyncTask::S_sleeping;
+            _sleeping.push_back(task);
+            push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
+            _cvar.signal_all();
+          }
+          break;
+
+        case AsyncTask::DS_abort:
+          // The task had an exception and wants to raise a big flag.
+          cleanup_task(task, false);
+          if (_state == S_started) {
+            _state = S_aborting;
+            _cvar.signal_all();
+          }
+          break;
+          
+        default:
+          // The task has finished.
+          cleanup_task(task, true);
+        }
+      }
+    }
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::cleanup_task
+//       Access: Protected
+//  Description: Called internally when a task has completed (or been
+//               interrupted) and is about to be removed from the
+//               active queue.  Assumes the lock is held.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+cleanup_task(AsyncTask *task, bool clean_exit) {
+  nassertv(task->_chain == this);
+
+  task->_state = AsyncTask::S_inactive;
+  task->_chain = NULL;
+  task->_manager = NULL;
+  --_num_tasks;
+  --(_manager->_num_tasks);
+
+  _manager->remove_task_by_name(task);
+
+  if (clean_exit && !task->_done_event.empty()) {
+    PT_Event event = new Event(task->_done_event);
+    event->add_parameter(EventParameter(task));
+    throw_event(event);
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::finish_sort_group
+//       Access: Protected
+//  Description: Called internally when all tasks of a given sort
+//               value have been completed, and it is time to
+//               increment to the next sort value, or begin the next
+//               epoch.  Assumes the lock is held.
+//
+//               Returns true if there are more tasks on the queue
+//               after this operation, or false if the task list is
+//               empty and we need to wait.
+////////////////////////////////////////////////////////////////////
+bool AsyncTaskChain::
+finish_sort_group() {
+  nassertr(_num_busy_threads == 0, true);
+
+  if (!_active.empty()) {
+    // There are more tasks; just set the next sort value.
+    nassertr(_current_sort < _active.front()->get_sort(), true);
+    _current_sort = _active.front()->get_sort();
+    _cvar.signal_all();
+    return true;
+  }
+
+  // There are no more tasks in this epoch; advance to the next epoch.
+  if (_tick_clock) {
+    _manager->_clock->tick();
+  }
+  if (!_threads.empty()) {
+    PStatClient::thread_tick(get_name());
+  }
+    
+  _active.swap(_next_active);
+
+  // Check for any sleeping tasks that need to be woken.
+  double now = _manager->_clock->get_frame_time();
+  while (!_sleeping.empty() && _sleeping.front()->get_wake_time() <= now) {
+    PT(AsyncTask) task = _sleeping.front();
+    pop_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
+    _sleeping.pop_back();
+    task->_state = AsyncTask::S_active;
+    _active.push_back(task);
+  }
+
+  make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
+  nassertr(_num_tasks == _active.size() + _sleeping.size(), true);
+
+  if (!_active.empty()) {
+    // Get the first task on the queue.
+    _current_sort = _active.front()->get_sort();
+    _cvar.signal_all();
+    return true;
+  }
+
+  // There are no tasks to be had anywhere.  Chill.
+  _current_sort = INT_MAX;
+  return false;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::do_stop_threads
+//       Access: Protected
+//  Description: The private implementation of stop_threads; assumes
+//               the lock is already held.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+do_stop_threads() {
+  if (_state == S_started || _state == S_aborting) {
+    _state = S_shutdown;
+    _cvar.signal_all();
+    
+    Threads wait_threads;
+    wait_threads.swap(_threads);
+    
+    // We have to release the lock while we join, so the threads can
+    // wake up and see that we're shutting down.
+    _manager->_lock.release();
+    Threads::iterator ti;
+    for (ti = wait_threads.begin(); ti != wait_threads.end(); ++ti) {
+      (*ti)->join();
+    }
+    _manager->_lock.lock();
+    
+    _state = S_initial;
+    nassertv(_num_busy_threads == 0);
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::do_start_threads
+//       Access: Protected
+//  Description: The private implementation of start_threads; assumes
+//               the lock is already held.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+do_start_threads() {
+  if (_state == S_aborting) {
+    do_stop_threads();
+  }
+
+  if (_state == S_initial) {
+    _state = S_started;
+    _num_busy_threads = 0;
+    if (Thread::is_threading_supported()) {
+      _needs_cleanup = true;
+      _threads.reserve(_num_threads);
+      for (int i = 0; i < _num_threads; ++i) {
+        ostringstream strm;
+        strm << _manager->get_name();
+        if (has_name()) {
+          strm << "_" << get_name();
+        }
+        strm << "_" << i;
+        PT(AsyncTaskChainThread) thread = new AsyncTaskChainThread(strm.str(), this);
+        if (thread->start(TP_low, true)) {
+          _threads.push_back(thread);
+        }
+      }
+    }
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::do_get_active_tasks
+//       Access: Protected
+//  Description: Returns the set of tasks that are active (and not
+//               sleeping) on the task chain, at the time of the
+//               call.  Assumes the lock is held.
+////////////////////////////////////////////////////////////////////
+AsyncTaskCollection AsyncTaskChain::
+do_get_active_tasks() const {
+  AsyncTaskCollection result;
+
+  Threads::const_iterator thi;
+  for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
+    AsyncTask *task = (*thi)->_servicing;
+    if (task != (AsyncTask *)NULL) {
+      result.add_task(task);
+    }
+  }
+  TaskHeap::const_iterator ti;
+  for (ti = _active.begin(); ti != _active.end(); ++ti) {
+    AsyncTask *task = (*ti);
+    result.add_task(task);
+  }
+  for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
+    AsyncTask *task = (*ti);
+    result.add_task(task);
+  }
+
+  return result;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::do_get_sleeping_tasks
+//       Access: Published
+//  Description: Returns the set of tasks that are sleeping (and not
+//               active) on the task chain, at the time of the
+//               call.  Assumes the lock is held.
+////////////////////////////////////////////////////////////////////
+AsyncTaskCollection AsyncTaskChain::
+do_get_sleeping_tasks() const {
+  AsyncTaskCollection result;
+
+  TaskHeap::const_iterator ti;
+  for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
+    AsyncTask *task = (*ti);
+    result.add_task(task);
+  }
+
+  return result;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::do_poll
+//       Access: Protected
+//  Description: The private implementation of poll(), this assumes
+//               the lock is already held.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+do_poll() {
+  do_start_threads();
+
+  if (!_threads.empty()) {
+    return;
+  }
+  
+  while (!_active.empty() && _state != S_shutdown && _state != S_aborting) {
+    _current_sort = _active.front()->get_sort();
+    service_one_task(NULL);
+  }
+
+  if (_state != S_shutdown && _state != S_aborting) {
+    finish_sort_group();
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::do_write
+//       Access: Protected
+//  Description: The private implementation of write(), this assumes
+//               the lock is already held.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+do_write(ostream &out, int indent_level) const {
+  indent(out, indent_level)
+    << get_type() << " " << get_name() << "\n";
+
+  // Collect a list of all active tasks, then sort them into order for
+  // output.
+  TaskHeap tasks = _active;
+  tasks.insert(tasks.end(), _next_active.begin(), _next_active.end());
+
+  Threads::const_iterator thi;
+  for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
+    AsyncTask *task = (*thi)->_servicing;
+    if (task != (AsyncTask *)NULL && 
+        task->_state != AsyncTask::S_servicing_removed) {
+      tasks.push_back(task);
+    }
+  }
+
+  if (!tasks.empty()) {
+    indent(out, indent_level + 2)
+      << "Active tasks:\n";
+
+    sort(tasks.begin(), tasks.end(), AsyncTaskSortPriority());
+
+    // Since AsyncTaskSortPriority() sorts backwards (because of STL's
+    // push_heap semantics), we go through the task list in reverse
+    // order to print them forwards.
+    TaskHeap::reverse_iterator ti;
+    int current_sort = tasks.back()->get_sort() - 1;
+    for (ti = tasks.rbegin(); ti != tasks.rend(); ++ti) {
+      AsyncTask *task = (*ti);
+      if (task->get_sort() != current_sort) {
+        current_sort = task->get_sort();
+        indent(out, indent_level + 2)
+          << "sort = " << current_sort << "\n";
+      }
+      if (task->_state == AsyncTask::S_servicing) {
+        indent(out, indent_level + 3)
+          << "*" << *task << "\n";
+      } else {
+        indent(out, indent_level + 4)
+          << *task << "\n";
+      }
+    }
+  }
+
+  if (!_sleeping.empty()) {
+    indent(out, indent_level + 2)
+      << "Sleeping tasks:\n";
+    double now = _manager->_clock->get_frame_time();
+
+    // Instead of iterating through the _sleeping list in heap order,
+    // copy it and then use repeated pops to get it out in sorted
+    // order, for the user's satisfaction.
+    TaskHeap sleeping = _sleeping;
+    while (!sleeping.empty()) {
+      PT(AsyncTask) task = sleeping.front();
+      pop_heap(sleeping.begin(), sleeping.end(), AsyncTaskSortWakeTime());
+      sleeping.pop_back();
+
+      indent(out, indent_level + 4)
+        << task->get_wake_time() - now << "s: "
+        << *task << "\n";
+    }
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::AsyncTaskChainThread::Constructor
+//       Access: Public
+//  Description: 
+////////////////////////////////////////////////////////////////////
+AsyncTaskChain::AsyncTaskChainThread::
+AsyncTaskChainThread(const string &name, AsyncTaskChain *chain) :
+  Thread(name, chain->get_name()),
+  _chain(chain),
+  _servicing(NULL)
+{
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::AsyncTaskChainThread::thread_main
+//       Access: Public, Virtual
+//  Description: 
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::AsyncTaskChainThread::
+thread_main() {
+  MutexHolder holder(_chain->_manager->_lock);
+  while (_chain->_state != S_shutdown && _chain->_state != S_aborting) {
+    if (!_chain->_active.empty() &&
+        _chain->_active.front()->get_sort() == _chain->_current_sort) {
+      PStatTimer timer(_task_pcollector);
+      _chain->_num_busy_threads++;
+      _chain->service_one_task(this);
+      _chain->_num_busy_threads--;
+      _chain->_cvar.signal_all();
+
+    } else {
+      // We've finished all the available tasks of the current sort
+      // value.  We can't pick up a new task until all of the threads
+      // finish the tasks with the same sort value.
+      if (_chain->_num_busy_threads == 0) {
+        // We're the last thread to finish.  Update _current_sort.
+        if (!_chain->finish_sort_group()) {
+          // Nothing to do.  Wait for more tasks to be added.
+          if (_chain->_sleeping.empty()) {
+            PStatTimer timer(_wait_pcollector);
+            _chain->_cvar.wait();
+          } else {
+            double wake_time = _chain->_sleeping.front()->get_wake_time();
+            double now = _chain->_manager->_clock->get_frame_time();
+            double timeout = max(wake_time - now, 0.0);
+            PStatTimer timer(_wait_pcollector);
+            _chain->_cvar.wait(timeout);
+          }            
+        }
+
+      } else {
+        // Wait for the other threads to finish their current task
+        // before we continue.
+        PStatTimer timer(_wait_pcollector);
+        _chain->_cvar.wait();
+      }
+    }
+  }
+}
+

+ 190 - 0
panda/src/event/asyncTaskChain.h

@@ -0,0 +1,190 @@
+// Filename: asyncTaskChain.h
+// Created by:  drose (23Aug06)
+//
+////////////////////////////////////////////////////////////////////
+//
+// PANDA 3D SOFTWARE
+// Copyright (c) Carnegie Mellon University.  All rights reserved.
+//
+// All use of this software is subject to the terms of the revised BSD
+// license.  You should have received a copy of this license along
+// with this source code in a file named "LICENSE."
+//
+////////////////////////////////////////////////////////////////////
+
+#ifndef ASYNCTASKCHAIN_H
+#define ASYNCTASKCHAIN_H
+
+#include "pandabase.h"
+
+#include "asyncTask.h"
+#include "asyncTaskCollection.h"
+#include "typedReferenceCount.h"
+#include "thread.h"
+#include "pmutex.h"
+#include "conditionVarFull.h"
+#include "pvector.h"
+#include "pdeque.h"
+#include "pStatCollector.h"
+#include "clockObject.h"
+
+class AsyncTaskManager;
+
+////////////////////////////////////////////////////////////////////
+//       Class : AsyncTaskChain
+// Description : The AsyncTaskChain is a subset of the
+//               AsyncTaskManager.  Each chain maintains a separate
+//               list of tasks, and will execute them with its own set
+//               of threads.  Each chain may thereby operate
+//               independently of the other chains.
+//
+//               The AsyncTaskChain will spawn a specified number of
+//               threads (possibly 0) to serve the tasks.  If there
+//               are no threads, you must call poll() from time to
+//               time to serve the tasks in the main thread.  Normally
+//               this is done by calling AsyncTaskManager::poll().
+//
+//               Each task will run exactly once each epoch.  Beyond
+//               that, the tasks' sort and priority values control the
+//               order in which they are run: tasks are run in
+//               increasing order by sort value, and within the same
+//               sort value, they are run roughly in decreasing order
+//               by priority value, with some exceptions for
+//               parallelism.  Tasks with different sort values are
+//               never run in parallel together, but tasks with
+//               different priority values might be (if there is more
+//               than one thread).
+////////////////////////////////////////////////////////////////////
+class EXPCL_PANDA_EVENT AsyncTaskChain : public TypedReferenceCount, public Namable {
+public:
+  AsyncTaskChain(AsyncTaskManager *manager, const string &name);
+  ~AsyncTaskChain();
+
+PUBLISHED:
+  INLINE void set_tick_clock(bool tick_clock);
+  INLINE bool get_tick_clock() const;
+
+  BLOCKING void set_num_threads(int num_threads);
+  int get_num_threads() const;
+  int get_num_running_threads() const;
+
+  BLOCKING void stop_threads();
+  void start_threads();
+  INLINE bool is_started() const;
+
+  bool has_task(AsyncTask *task) const;
+
+  BLOCKING void wait_for_tasks();
+
+  int get_num_tasks() const;
+  AsyncTaskCollection get_tasks() const;
+  AsyncTaskCollection get_active_tasks() const;
+  AsyncTaskCollection get_sleeping_tasks() const;
+
+  void poll();
+
+  virtual void output(ostream &out) const;
+  virtual void write(ostream &out, int indent_level = 0) const;
+
+protected:
+  class AsyncTaskChainThread;
+  typedef pvector< PT(AsyncTask) > TaskHeap;
+
+  void do_add(AsyncTask *task);
+  bool do_remove(AsyncTask *task);
+  void do_wait_for_tasks();
+  void do_cleanup();
+
+  bool do_has_task(AsyncTask *task) const;
+  int find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const;
+
+  void service_one_task(AsyncTaskChainThread *thread);
+  void cleanup_task(AsyncTask *task, bool clean_exit);
+  bool finish_sort_group();
+  void do_stop_threads();
+  void do_start_threads();
+  AsyncTaskCollection do_get_active_tasks() const;
+  AsyncTaskCollection do_get_sleeping_tasks() const;
+  void do_poll();
+  void do_write(ostream &out, int indent_level) const;
+
+protected:
+  class AsyncTaskChainThread : public Thread {
+  public:
+    AsyncTaskChainThread(const string &name, AsyncTaskChain *chain);
+    virtual void thread_main();
+
+    AsyncTaskChain *_chain;
+    AsyncTask *_servicing;
+  };
+
+  class AsyncTaskSortWakeTime {
+  public:
+    bool operator () (AsyncTask *a, AsyncTask *b) const {
+      return a->get_wake_time() > b->get_wake_time();
+    }
+  };
+  
+  class AsyncTaskSortPriority {
+  public:
+    bool operator () (AsyncTask *a, AsyncTask *b) const {
+      if (a->get_sort() != b->get_sort()) {
+        return a->get_sort() > b->get_sort();
+      }
+      return a->get_priority() < b->get_priority();
+    }
+  };
+
+  typedef pvector< PT(AsyncTaskChainThread) > Threads;
+
+  AsyncTaskManager *_manager;
+
+  ConditionVarFull _cvar;  // signaled when _active, _next_active, _sleeping, _state, or _current_sort changes, or a task finishes.
+
+  enum State {
+    S_initial,  // no threads yet
+    S_started,  // threads have been started
+    S_aborting, // task returned DS_abort, shutdown requested from sub-thread.
+    S_shutdown  // waiting for thread shutdown, requested from main thread
+  };
+
+  bool _tick_clock;
+  int _num_threads;
+  Threads _threads;
+  int _num_busy_threads;
+  int _num_tasks;
+  TaskHeap _active;
+  TaskHeap _next_active;
+  TaskHeap _sleeping;
+  State _state;
+  int _current_sort;
+  bool _needs_cleanup;
+  
+  static PStatCollector _task_pcollector;
+  static PStatCollector _wait_pcollector;
+
+public:
+  static TypeHandle get_class_type() {
+    return _type_handle;
+  }
+  static void init_type() {
+    TypedReferenceCount::init_type();
+    register_type(_type_handle, "AsyncTaskChain",
+                  TypedReferenceCount::get_class_type());
+  }
+  virtual TypeHandle get_type() const {
+    return get_class_type();
+  }
+  virtual TypeHandle force_init_type() {init_type(); return get_class_type();}
+
+private:
+  static TypeHandle _type_handle;
+
+  friend class AsyncTaskChainThread;
+  friend class AsyncTask;
+  friend class AsyncTaskManager;
+};
+
+#include "asyncTaskChain.I"
+
+#endif

+ 0 - 62
panda/src/event/asyncTaskManager.I

@@ -13,19 +13,6 @@
 ////////////////////////////////////////////////////////////////////
 
 
-////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::is_started
-//       Access: Published
-//  Description: Returns true if the thread(s) have been started and
-//               are ready to service requests, false otherwise.  If
-//               this is false, the next call to add() or add_and_do()
-//               will automatically start the threads.
-////////////////////////////////////////////////////////////////////
-INLINE bool AsyncTaskManager::
-is_started() const {
-  return (_state == S_started);
-}
-
 ////////////////////////////////////////////////////////////////////
 //     Function: AsyncTaskManager::set_clock
 //       Access: Published
@@ -53,55 +40,6 @@ get_clock() {
   return _clock;
 }
 
-////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::set_tick_clock
-//       Access: Published
-//  Description: Sets the tick_clock flag.  When this is true,
-//               get_clock()->tick() will be called automatically at
-//               each task epoch.  This is false by default.
-////////////////////////////////////////////////////////////////////
-INLINE void AsyncTaskManager::
-set_tick_clock(bool clock) {
-  _tick_clock = clock;
-}
-
-////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::get_tick_clock
-//       Access: Published
-//  Description: Returns the tick_clock flag..  See set_tick_clock().
-////////////////////////////////////////////////////////////////////
-INLINE bool AsyncTaskManager::
-get_tick_clock() const {
-  return _tick_clock;
-}
-
-////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::get_num_threads
-//       Access: Published
-//  Description: Returns the number of threads that will be servicing
-//               tasks for this manager.  Also see
-//               get_num_running_threads().
-////////////////////////////////////////////////////////////////////
-INLINE int AsyncTaskManager::
-get_num_threads() const {
-  MutexHolder holder(_lock);
-  return _num_threads;
-}
-
-////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::get_num_running_threads
-//       Access: Published
-//  Description: Returns the number of threads that have been created
-//               and are actively running.  This will return 0 before
-//               the threads have been started; it will also return 0
-//               if thread support is not available.
-////////////////////////////////////////////////////////////////////
-INLINE int AsyncTaskManager::
-get_num_running_threads() const {
-  MutexHolder holder(_lock);
-  return _threads.size();
-}
-
 ////////////////////////////////////////////////////////////////////
 //     Function: AsyncTaskManager::get_num_tasks
 //       Access: Published

+ 221 - 556
panda/src/event/asyncTaskManager.cxx

@@ -22,31 +22,21 @@
 #include "pStatClient.h"
 #include "pStatTimer.h"
 #include "clockObject.h"
+#include "config_event.h"
 #include <algorithm>
 
 TypeHandle AsyncTaskManager::_type_handle;
 
-PStatCollector AsyncTaskManager::_task_pcollector("Task");
-PStatCollector AsyncTaskManager::_wait_pcollector("Wait");
-
 ////////////////////////////////////////////////////////////////////
 //     Function: AsyncTaskManager::Constructor
 //       Access: Published
 //  Description:
 ////////////////////////////////////////////////////////////////////
 AsyncTaskManager::
-AsyncTaskManager(const string &name, int num_threads) :
+AsyncTaskManager(const string &name) :
   Namable(name),
-  _num_threads(0),
-  _cvar(_lock),
-  _num_tasks(0),
-  _num_busy_threads(0),
-  _state(S_initial),
-  _current_sort(INT_MAX),
-  _clock(ClockObject::get_global_clock()),
-  _tick_clock(false)
+  _clock(ClockObject::get_global_clock())
 {
-  set_num_threads(num_threads);
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -56,75 +46,100 @@ AsyncTaskManager(const string &name, int num_threads) :
 ////////////////////////////////////////////////////////////////////
 AsyncTaskManager::
 ~AsyncTaskManager() {
-  stop_threads();
+  MutexHolder holder(_lock);
 
-  TaskHeap::const_iterator ti;
-  for (ti = _active.begin(); ti != _active.end(); ++ti) {
-    AsyncTask *task = (*ti);
-    cleanup_task(task, false);
-  }
-  for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
-    AsyncTask *task = (*ti);
-    cleanup_task(task, false);
-  }
-  for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
-    AsyncTask *task = (*ti);
-    cleanup_task(task, false);
+  TaskChains::iterator tci;
+  for (tci = _task_chains.begin();
+       tci != _task_chains.end();
+       ++tci) {
+    AsyncTaskChain *chain = (*tci);
+    chain->do_cleanup();
   }
 }
 
 ////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::set_num_threads
+//     Function: AsyncTaskManager::get_num_task_chains
 //       Access: Published
-//  Description: Changes the number of threads for this task manager.
-//               This may require stopping the threads if they are
-//               already running.
+//  Description: Returns the number of different task chains.
 ////////////////////////////////////////////////////////////////////
-void AsyncTaskManager::
-set_num_threads(int num_threads) {
-  nassertv(num_threads >= 0);
-
-  if (!Thread::is_threading_supported()) {
-    num_threads = 0;
-  }
-
+int AsyncTaskManager::
+get_num_task_chains() const {
   MutexHolder holder(_lock);
-  if (_num_threads != num_threads) {
-    do_stop_threads();
-    _num_threads = num_threads;
-  }
+  return _task_chains.size();
 }
 
 ////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::stop_threads
+//     Function: AsyncTaskManager::get_task_chain
 //       Access: Published
-//  Description: Stops any threads that are currently running.  If any
-//               tasks are still pending and have not yet been picked
-//               up by a thread, they will not be serviced unless
-//               poll() or start_threads() is later called.
+//  Description: Returns the nth task chain.
 ////////////////////////////////////////////////////////////////////
-void AsyncTaskManager::
-stop_threads() {
-  if (_state == S_started || _state == S_aborting) {
-    // Clean up all of the threads.
-    MutexHolder holder(_lock);
-    do_stop_threads();
-  }
+AsyncTaskChain *AsyncTaskManager::
+get_task_chain(int n) const {
+  MutexHolder holder(_lock);
+  nassertr(n >= 0 && n < (int)_task_chains.size(), NULL);
+  return _task_chains[n];
 }
 
 ////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::start_threads
+//     Function: AsyncTaskManager::make_task_chain
 //       Access: Published
-//  Description: Starts any requested threads to service the tasks on
-//               the queue.  This is normally not necessary, since
-//               adding a task will start the threads automatically.
+//  Description: Creates a new AsyncTaskChain of the indicated name
+//               and stores it within the AsyncTaskManager.  If a task
+//               chain with this name already exists, returns it
+//               instead.
 ////////////////////////////////////////////////////////////////////
-void AsyncTaskManager::
-start_threads() {
-  if (_state == S_initial || _state == S_aborting) {
-    MutexHolder holder(_lock);
-    do_start_threads();
+AsyncTaskChain *AsyncTaskManager::
+make_task_chain(const string &name) {
+  MutexHolder holder(_lock);
+  return do_make_task_chain(name);
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskManager::find_task_chain
+//       Access: Protected
+//  Description: Searches a new AsyncTaskChain of the indicated name
+//               and returns it if it exists, or NULL otherwise.
+////////////////////////////////////////////////////////////////////
+AsyncTaskChain *AsyncTaskManager::
+find_task_chain(const string &name) {
+  MutexHolder holder(_lock);
+  return do_find_task_chain(name);
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskManager::remove_task_chain
+//       Access: Protected
+//  Description: Removes the AsyncTaskChain of the indicated name.
+//               If the chain still has tasks, this will block until
+//               all tasks are finished.
+//
+//               Returns true if successful, or false if the chain did
+//               not exist.
+////////////////////////////////////////////////////////////////////
+bool AsyncTaskManager::
+remove_task_chain(const string &name) {
+  MutexHolder holder(_lock);
+
+  PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name);
+  TaskChains::iterator tci = _task_chains.find(chain);
+  if (tci == _task_chains.end()) {
+    // No chain.
+    return false;
   }
+
+  chain = (*tci);
+
+  while (chain->_num_tasks != 0) {
+    // Still has tasks.
+    event_cat.info()
+      << "Waiting for tasks on chain " << name << " to finish.\n";
+    chain->do_wait_for_tasks();
+  }
+
+  // Safe to remove.
+  chain->do_cleanup();
+  _task_chains.erase(tci);
+  return true;
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -151,35 +166,16 @@ add(AsyncTask *task) {
            task->_state == AsyncTask::S_inactive);
   nassertv(!do_has_task(task));
 
-  do_start_threads();
-
-  task->_manager = this;
   add_task_by_name(task);
 
-  double now = _clock->get_frame_time();
-  task->_start_time = now;
-
-  if (task->has_delay()) {
-    // This is a deferred task.  Add it to the sleeping queue.
-    task->_wake_time = now + task->get_delay();
-    task->_state = AsyncTask::S_sleeping;
-    _sleeping.push_back(task);
-    push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
-
-  } else {
-    // This is an active task.  Add it to the active set.
-    task->_state = AsyncTask::S_active;
-    if (task->get_sort() > _current_sort) {
-      // It will run this frame.
-      _active.push_back(task);
-      push_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
-    } else {
-      // It will run next frame.
-      _next_active.push_back(task);
-    }
+  AsyncTaskChain *chain = do_find_task_chain(task->_chain_name);
+  if (chain == (AsyncTaskChain *)NULL) {
+    event_cat.warning()
+      << "Creating implicit AsyncTaskChain " << task->_chain_name
+      << " for " << get_type() << " " << get_name() << "\n";
+    chain = do_make_task_chain(task->_chain_name);
   }
-  ++_num_tasks;
-  _cvar.signal_all();
+  chain->do_add(task);
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -311,43 +307,9 @@ remove(const AsyncTaskCollection &tasks) {
       // Not a member of this manager, or already removed.
       nassertr(!do_has_task(task), num_removed);
     } else {
-      switch (task->_state) {
-      case AsyncTask::S_servicing:
-        // This task is being serviced.
-        task->_state = AsyncTask::S_servicing_removed;
-        break;
-
-      case AsyncTask::S_servicing_removed:
-        // Being serviced, though it will be removed later.
-        break;
-      
-      case AsyncTask::S_sleeping:
-        // Sleeping, easy.
-        {
-          int index = find_task_on_heap(_sleeping, task);
-          nassertr(index != -1, num_removed);
-          _sleeping.erase(_sleeping.begin() + index);
-          make_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
-          ++num_removed;
-          cleanup_task(task, false);
-        }
-        break;
-
-      case AsyncTask::S_active:
-        {
-          // Active, but not being serviced, easy.
-          int index = find_task_on_heap(_active, task);
-          if (index != -1) {
-            _active.erase(_active.begin() + index);
-            make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
-          } else {
-            index = find_task_on_heap(_next_active, task);
-            nassertr(index != -1, num_removed);
-            _next_active.erase(_next_active.begin() + index);
-          }
-          ++num_removed;
-          cleanup_task(task, false);
-        }
+      nassertr(task->_chain->_manager == this, num_removed);
+      if (task->_chain->do_remove(task)) {
+        ++num_removed;
       }
     }
   }
@@ -364,27 +326,56 @@ void AsyncTaskManager::
 wait_for_tasks() {
   MutexHolder holder(_lock);
 
-  do_start_threads();
-
-  if (_threads.empty()) {
-    // Non-threaded case.
-    while (_num_tasks > 0) {
-      if (_state == S_shutdown || _state == S_aborting) {
-        return;
-      }
-      do_poll();
+  // Wait for each of our task chains to finish.
+  while (_num_tasks > 0) {
+    TaskChains::iterator tci;
+    for (tci = _task_chains.begin();
+         tci != _task_chains.end();
+         ++tci) {
+      AsyncTaskChain *chain = (*tci);
+      chain->do_wait_for_tasks();
     }
+  }
+}
 
-  } else {
-    // Threaded case.
-    while (_num_tasks > 0) {
-      if (_state == S_shutdown || _state == S_aborting) {
-        return;
-      }
-      
-      PStatTimer timer(_wait_pcollector);
-      _cvar.wait();
-    }
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskManager::stop_threads
+//       Access: Published
+//  Description: Stops any threads that are currently running.  If any
+//               tasks are still pending and have not yet been picked
+//               up by a thread, they will not be serviced unless
+//               poll() or start_threads() is later called.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskManager::
+stop_threads() {
+  MutexHolder holder(_lock);
+
+  TaskChains::iterator tci;
+  for (tci = _task_chains.begin();
+       tci != _task_chains.end();
+       ++tci) {
+    AsyncTaskChain *chain = (*tci);
+    chain->do_stop_threads();
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskManager::start_threads
+//       Access: Published
+//  Description: Starts any requested threads to service the tasks on
+//               the queue.  This is normally not necessary, since
+//               adding a task will start the threads automatically.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskManager::
+start_threads() {
+  MutexHolder holder(_lock);
+
+  TaskChains::iterator tci;
+  for (tci = _task_chains.begin();
+       tci != _task_chains.end();
+       ++tci) {
+    AsyncTaskChain *chain = (*tci);
+    chain->do_start_threads();
   }
 }
 
@@ -395,9 +386,19 @@ wait_for_tasks() {
 //               on the task manager, at the time of the call.
 ////////////////////////////////////////////////////////////////////
 AsyncTaskCollection AsyncTaskManager::
-get_tasks() {
-  AsyncTaskCollection result = get_active_tasks();
-  result.add_tasks_from(get_sleeping_tasks());
+get_tasks() const {
+  MutexHolder holder(_lock);
+
+  AsyncTaskCollection result;
+  TaskChains::const_iterator tci;
+  for (tci = _task_chains.begin();
+       tci != _task_chains.end();
+       ++tci) {
+    AsyncTaskChain *chain = (*tci);
+    result.add_tasks_from(chain->do_get_active_tasks());
+    result.add_tasks_from(chain->do_get_sleeping_tasks());
+  }
+
   return result;
 }
 
@@ -409,24 +410,16 @@ get_tasks() {
 //               call.
 ////////////////////////////////////////////////////////////////////
 AsyncTaskCollection AsyncTaskManager::
-get_active_tasks() {
-  AsyncTaskCollection result;
+get_active_tasks() const {
+  MutexHolder holder(_lock);
 
-  Threads::const_iterator thi;
-  for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
-    AsyncTask *task = (*thi)->_servicing;
-    if (task != (AsyncTask *)NULL) {
-      result.add_task(task);
-    }
-  }
-  TaskHeap::const_iterator ti;
-  for (ti = _active.begin(); ti != _active.end(); ++ti) {
-    AsyncTask *task = (*ti);
-    result.add_task(task);
-  }
-  for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
-    AsyncTask *task = (*ti);
-    result.add_task(task);
+  AsyncTaskCollection result;
+  TaskChains::const_iterator tci;
+  for (tci = _task_chains.begin();
+       tci != _task_chains.end();
+       ++tci) {
+    AsyncTaskChain *chain = (*tci);
+    result.add_tasks_from(chain->do_get_active_tasks());
   }
 
   return result;
@@ -440,13 +433,16 @@ get_active_tasks() {
 //               call.
 ////////////////////////////////////////////////////////////////////
 AsyncTaskCollection AsyncTaskManager::
-get_sleeping_tasks() {
-  AsyncTaskCollection result;
+get_sleeping_tasks() const {
+  MutexHolder holder(_lock);
 
-  TaskHeap::const_iterator ti;
-  for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
-    AsyncTask *task = (*ti);
-    result.add_task(task);
+  AsyncTaskCollection result;
+  TaskChains::const_iterator tci;
+  for (tci = _task_chains.begin();
+       tci != _task_chains.end();
+       ++tci) {
+    AsyncTaskChain *chain = (*tci);
+    result.add_tasks_from(chain->do_get_sleeping_tasks());
   }
 
   return result;
@@ -464,13 +460,14 @@ get_sleeping_tasks() {
 void AsyncTaskManager::
 poll() {
   MutexHolder holder(_lock);
-  do_start_threads();
 
-  if (!_threads.empty()) {
-    return;
+  TaskChains::iterator tci;
+  for (tci = _task_chains.begin();
+       tci != _task_chains.end();
+       ++tci) {
+    AsyncTaskChain *chain = (*tci);
+    chain->do_poll();
   }
-  
-  do_poll();
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -497,66 +494,12 @@ write(ostream &out, int indent_level) const {
   indent(out, indent_level)
     << get_type() << " " << get_name() << "\n";
 
-  // Collect a list of all active tasks, then sort them into order for
-  // output.
-  TaskHeap tasks = _active;
-  tasks.insert(tasks.end(), _next_active.begin(), _next_active.end());
-
-  Threads::const_iterator thi;
-  for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
-    AsyncTask *task = (*thi)->_servicing;
-    if (task != (AsyncTask *)NULL && 
-        task->_state != AsyncTask::S_servicing_removed) {
-      tasks.push_back(task);
-    }
-  }
-
-  if (!tasks.empty()) {
-    indent(out, indent_level + 2)
-      << "Active tasks:\n";
-
-    sort(tasks.begin(), tasks.end(), AsyncTaskSortPriority());
-
-    // Since AsyncTaskSortPriority() sorts backwards (because of STL's
-    // push_heap semantics), we go through the task list in reverse
-    // order to print them forwards.
-    TaskHeap::reverse_iterator ti;
-    int current_sort = tasks.back()->get_sort() - 1;
-    for (ti = tasks.rbegin(); ti != tasks.rend(); ++ti) {
-      AsyncTask *task = (*ti);
-      if (task->get_sort() != current_sort) {
-        current_sort = task->get_sort();
-        indent(out, indent_level + 2)
-          << "sort = " << current_sort << "\n";
-      }
-      if (task->_state == AsyncTask::S_servicing) {
-        indent(out, indent_level + 3)
-          << "*" << *task << "\n";
-      } else {
-        indent(out, indent_level + 4)
-          << *task << "\n";
-      }
-    }
-  }
-
-  if (!_sleeping.empty()) {
-    indent(out, indent_level + 2)
-      << "Sleeping tasks:\n";
-    double now = _clock->get_frame_time();
-
-    // Instead of iterating through the _sleeping list in heap order,
-    // copy it and then use repeated pops to get it out in sorted
-    // order, for the user's satisfaction.
-    TaskHeap sleeping = _sleeping;
-    while (!sleeping.empty()) {
-      PT(AsyncTask) task = sleeping.front();
-      pop_heap(sleeping.begin(), sleeping.end(), AsyncTaskSortWakeTime());
-      sleeping.pop_back();
-
-      indent(out, indent_level + 4)
-        << task->get_wake_time() - now << "s: "
-        << *task << "\n";
-    }
+  TaskChains::const_iterator tci;
+  for (tci = _task_chains.begin();
+       tci != _task_chains.end();
+       ++tci) {
+    AsyncTaskChain *chain = (*tci);
+    chain->do_write(out, indent_level + 2);
   }
 }
 
@@ -570,29 +513,55 @@ write(ostream &out, int indent_level) const {
 ////////////////////////////////////////////////////////////////////
 bool AsyncTaskManager::
 do_has_task(AsyncTask *task) const {
-  return (find_task_on_heap(_active, task) != -1 ||
-          find_task_on_heap(_next_active, task) != -1 ||
-          find_task_on_heap(_sleeping, task) != -1);
+  TaskChains::const_iterator tci;
+  for (tci = _task_chains.begin();
+       tci != _task_chains.end();
+       ++tci) {
+    AsyncTaskChain *chain = (*tci);
+    if (chain->do_has_task(task)) {
+      return true;
+    }
+  }
+
+  return false;
 }
 
 ////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::find_task_on_heap
+//     Function: AsyncTaskManager::do_make_task_chain
 //       Access: Protected
-//  Description: Returns the index number of the indicated task within
-//               the specified task list, or -1 if the task is not
-//               found in the list (this may mean that it is currently
-//               being serviced).  Assumes that the lock is currently
-//               held.
+//  Description: Creates a new AsyncTaskChain of the indicated name
+//               and stores it within the AsyncTaskManager.  If a task
+//               chain with this name already exists, returns it
+//               instead.
+//
+//               Assumes the lock is held.
 ////////////////////////////////////////////////////////////////////
-int AsyncTaskManager::
-find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const {
-  for (int i = 0; i < (int)heap.size(); ++i) {
-    if (heap[i] == task) {
-      return i;
-    }
+AsyncTaskChain *AsyncTaskManager::
+do_make_task_chain(const string &name) {
+  PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name);
+
+  TaskChains::const_iterator tci = _task_chains.insert(chain).first;
+  return (*tci);
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskManager::do_find_task_chain
+//       Access: Protected
+//  Description: Searches a new AsyncTaskChain of the indicated name
+//               and returns it if it exists, or NULL otherwise.
+//
+//               Assumes the lock is held.
+////////////////////////////////////////////////////////////////////
+AsyncTaskChain *AsyncTaskManager::
+do_find_task_chain(const string &name) {
+  PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name);
+
+  TaskChains::const_iterator tci = _task_chains.find(chain);
+  if (tci != _task_chains.end()) {
+    return (*tci);
   }
 
-  return -1;
+  return NULL;
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -624,307 +593,3 @@ remove_task_by_name(AsyncTask *task) {
     nassertv(false);
   }
 }
-
-////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::service_one_task
-//       Access: Protected
-//  Description: Pops a single task off the active queue, services it,
-//               and restores it to the end of the queue.  This is
-//               called internally only within one of the task
-//               threads.  Assumes the lock is already held.
-////////////////////////////////////////////////////////////////////
-void AsyncTaskManager::
-service_one_task(AsyncTaskManager::AsyncTaskManagerThread *thread) {
-  if (!_active.empty()) {
-    PT(AsyncTask) task = _active.front();
-    pop_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
-    _active.pop_back();
-
-    if (thread != (AsyncTaskManager::AsyncTaskManagerThread *)NULL) {
-      thread->_servicing = task;
-    }
-
-    nassertv(task->get_sort() == _current_sort);
-    nassertv(task->_state == AsyncTask::S_active);
-    task->_state = AsyncTask::S_servicing;
-    task->_servicing_thread = thread;
-
-    // Now release the manager lock while we actually service the
-    // task.
-    _lock.release();
-    AsyncTask::DoneStatus ds = task->do_task();
-
-    // Now we have to re-acquire the manager lock, so we can put the
-    // task back on the queue (and so we can return with the lock
-    // still held).
-    _lock.lock();
-
-    if (thread != (AsyncTaskManager::AsyncTaskManagerThread *)NULL) {
-      thread->_servicing = NULL;
-    }
-    task->_servicing_thread = NULL;
-
-    if (task->_manager == this) {
-      if (task->_state == AsyncTask::S_servicing_removed) {
-        // This task wants to kill itself.
-        cleanup_task(task, false);
-
-      } else {
-        switch (ds) {
-        case AsyncTask::DS_cont:
-          // The task is still alive; put it on the next frame's active
-          // queue.
-          task->_state = AsyncTask::S_active;
-          _next_active.push_back(task);
-          _cvar.signal_all();
-          break;
-          
-        case AsyncTask::DS_again:
-          // The task wants to sleep again.
-          {
-            double now = _clock->get_frame_time();
-            task->_wake_time = now + task->get_delay();
-            task->_state = AsyncTask::S_sleeping;
-            _sleeping.push_back(task);
-            push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
-            _cvar.signal_all();
-          }
-          break;
-
-        case AsyncTask::DS_abort:
-          // The task had an exception and wants to raise a big flag.
-          cleanup_task(task, false);
-          if (_state == S_started) {
-            _state = S_aborting;
-            _cvar.signal_all();
-          }
-          break;
-          
-        default:
-          // The task has finished.
-          cleanup_task(task, true);
-        }
-      }
-    }
-  }
-}
-
-////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::cleanup_task
-//       Access: Protected
-//  Description: Called internally when a task has completed (or been
-//               interrupted) and is about to be removed from the
-//               active queue.  Assumes the lock is held.
-////////////////////////////////////////////////////////////////////
-void AsyncTaskManager::
-cleanup_task(AsyncTask *task, bool clean_exit) {
-  nassertv(task->_manager == this);
-
-  task->_state = AsyncTask::S_inactive;
-  task->_manager = NULL;
-  --_num_tasks;
-
-  remove_task_by_name(task);
-
-  if (clean_exit && !task->_done_event.empty()) {
-    PT_Event event = new Event(task->_done_event);
-    event->add_parameter(EventParameter(task));
-    throw_event(event);
-  }
-}
-
-////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::finish_sort_group
-//       Access: Protected
-//  Description: Called internally when all tasks of a given sort
-//               value have been completed, and it is time to
-//               increment to the next sort value, or begin the next
-//               epoch.  Assumes the lock is held.
-//
-//               Returns true if there are more tasks on the queue
-//               after this operation, or false if the task list is
-//               empty and we need to wait.
-////////////////////////////////////////////////////////////////////
-bool AsyncTaskManager::
-finish_sort_group() {
-  nassertr(_num_busy_threads == 0, true);
-
-  if (!_active.empty()) {
-    // There are more tasks; just set the next sort value.
-    nassertr(_current_sort < _active.front()->get_sort(), true);
-    _current_sort = _active.front()->get_sort();
-    _cvar.signal_all();
-    return true;
-  }
-
-  // There are no more tasks in this epoch; advance to the next epoch.
-  if (_tick_clock) {
-    _clock->tick();
-  }
-  if (!_threads.empty()) {
-    PStatClient::thread_tick(get_name());
-  }
-    
-  _active.swap(_next_active);
-
-  // Check for any sleeping tasks that need to be woken.
-  double now = _clock->get_frame_time();
-  while (!_sleeping.empty() && _sleeping.front()->get_wake_time() <= now) {
-    PT(AsyncTask) task = _sleeping.front();
-    pop_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
-    _sleeping.pop_back();
-    task->_state = AsyncTask::S_active;
-    _active.push_back(task);
-  }
-
-  make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
-  nassertr(_num_tasks == _active.size() + _sleeping.size(), true);
-
-  if (!_active.empty()) {
-    // Get the first task on the queue.
-    _current_sort = _active.front()->get_sort();
-    _cvar.signal_all();
-    return true;
-  }
-
-  // There are no tasks to be had anywhere.  Chill.
-  _current_sort = INT_MAX;
-  return false;
-}
-
-////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::do_stop_threads
-//       Access: Protected
-//  Description: The private implementation of stop_threads; assumes
-//               the lock is already held.
-////////////////////////////////////////////////////////////////////
-void AsyncTaskManager::
-do_stop_threads() {
-  if (_state == S_started || _state == S_aborting) {
-    _state = S_shutdown;
-    _cvar.signal_all();
-    
-    Threads wait_threads;
-    wait_threads.swap(_threads);
-    
-    // We have to release the lock while we join, so the threads can
-    // wake up and see that we're shutting down.
-    _lock.release();
-    Threads::iterator ti;
-    for (ti = wait_threads.begin(); ti != wait_threads.end(); ++ti) {
-      (*ti)->join();
-    }
-    _lock.lock();
-    
-    _state = S_initial;
-    nassertv(_num_busy_threads == 0);
-  }
-}
-
-////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::do_start_threads
-//       Access: Protected
-//  Description: The private implementation of start_threads; assumes
-//               the lock is already held.
-////////////////////////////////////////////////////////////////////
-void AsyncTaskManager::
-do_start_threads() {
-  if (_state == S_aborting) {
-    do_stop_threads();
-  }
-
-  if (_state == S_initial) {
-    _state = S_started;
-    _num_busy_threads = 0;
-    if (Thread::is_threading_supported()) {
-      _threads.reserve(_num_threads);
-      for (int i = 0; i < _num_threads; ++i) {
-        ostringstream strm;
-        strm << get_name() << "_" << i;
-        PT(AsyncTaskManagerThread) thread = new AsyncTaskManagerThread(strm.str(), this);
-        if (thread->start(TP_low, true)) {
-          _threads.push_back(thread);
-        }
-      }
-    }
-  }
-}
-
-////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::do_poll
-//       Access: Protected
-//  Description: The private implementation of poll(), this assumes
-//               the lock is already held.
-////////////////////////////////////////////////////////////////////
-void AsyncTaskManager::
-do_poll() {
-  while (!_active.empty() && _state != S_shutdown && _state != S_aborting) {
-    _current_sort = _active.front()->get_sort();
-    service_one_task(NULL);
-  }
-
-  if (_state != S_shutdown && _state != S_aborting) {
-    finish_sort_group();
-  }
-}
-
-////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::AsyncTaskManagerThread::Constructor
-//       Access: Public
-//  Description: 
-////////////////////////////////////////////////////////////////////
-AsyncTaskManager::AsyncTaskManagerThread::
-AsyncTaskManagerThread(const string &name, AsyncTaskManager *manager) :
-  Thread(name, manager->get_name()),
-  _manager(manager),
-  _servicing(NULL)
-{
-}
-
-////////////////////////////////////////////////////////////////////
-//     Function: AsyncTaskManager::AsyncTaskManagerThread::thread_main
-//       Access: Public, Virtual
-//  Description: 
-////////////////////////////////////////////////////////////////////
-void AsyncTaskManager::AsyncTaskManagerThread::
-thread_main() {
-  MutexHolder holder(_manager->_lock);
-  while (_manager->_state != S_shutdown && _manager->_state != S_aborting) {
-    if (!_manager->_active.empty() &&
-        _manager->_active.front()->get_sort() == _manager->_current_sort) {
-      PStatTimer timer(_task_pcollector);
-      _manager->_num_busy_threads++;
-      _manager->service_one_task(this);
-      _manager->_num_busy_threads--;
-      _manager->_cvar.signal_all();
-
-    } else {
-      // We've finished all the available tasks of the current sort
-      // value.  We can't pick up a new task until all of the threads
-      // finish the tasks with the same sort value.
-      if (_manager->_num_busy_threads == 0) {
-        // We're the last thread to finish.  Update _current_sort.
-        if (!_manager->finish_sort_group()) {
-          // Nothing to do.  Wait for more tasks to be added.
-          if (_manager->_sleeping.empty()) {
-            PStatTimer timer(_wait_pcollector);
-            _manager->_cvar.wait();
-          } else {
-            double wake_time = _manager->_sleeping.front()->get_wake_time();
-            double now = _manager->_clock->get_frame_time();
-            double timeout = max(wake_time - now, 0.0);
-            PStatTimer timer(_wait_pcollector);
-            _manager->_cvar.wait(timeout);
-          }            
-        }
-
-      } else {
-        // Wait for the other threads to finish their current task
-        // before we continue.
-        PStatTimer timer(_wait_pcollector);
-        _manager->_cvar.wait();
-      }
-    }
-  }
-}
-

+ 24 - 74
panda/src/event/asyncTaskManager.h

@@ -19,6 +19,7 @@
 
 #include "asyncTask.h"
 #include "asyncTaskCollection.h"
+#include "asyncTaskChain.h"
 #include "typedReferenceCount.h"
 #include "thread.h"
 #include "pmutex.h"
@@ -27,6 +28,8 @@
 #include "pdeque.h"
 #include "pStatCollector.h"
 #include "clockObject.h"
+#include "ordered_vector.h"
+#include "indirectCompareNames.h"
 
 ////////////////////////////////////////////////////////////////////
 //       Class : AsyncTaskManager
@@ -52,22 +55,17 @@
 ////////////////////////////////////////////////////////////////////
 class EXPCL_PANDA_EVENT AsyncTaskManager : public TypedReferenceCount, public Namable {
 PUBLISHED:
-  AsyncTaskManager(const string &name, int num_threads = 0);
+  AsyncTaskManager(const string &name);
   BLOCKING virtual ~AsyncTaskManager();
 
   INLINE void set_clock(ClockObject *clock);
   INLINE ClockObject *get_clock();
 
-  INLINE void set_tick_clock(bool tick_clock);
-  INLINE bool get_tick_clock() const;
-
-  BLOCKING void set_num_threads(int num_threads);
-  INLINE int get_num_threads() const;
-  INLINE int get_num_running_threads() const;
-
-  BLOCKING void stop_threads();
-  void start_threads();
-  INLINE bool is_started() const;
+  int get_num_task_chains() const;
+  AsyncTaskChain *get_task_chain(int n) const;
+  AsyncTaskChain *make_task_chain(const string &name);
+  AsyncTaskChain *find_task_chain(const string &name);
+  BLOCKING bool remove_task_chain(const string &name);
 
   void add(AsyncTask *task);
   bool has_task(AsyncTask *task) const;
@@ -80,12 +78,14 @@ PUBLISHED:
   int remove(const AsyncTaskCollection &tasks);
 
   BLOCKING void wait_for_tasks();
+  BLOCKING void stop_threads();
+  void start_threads();
 
   INLINE int get_num_tasks() const;
 
-  AsyncTaskCollection get_tasks();
-  AsyncTaskCollection get_active_tasks();
-  AsyncTaskCollection get_sleeping_tasks();
+  AsyncTaskCollection get_tasks() const;
+  AsyncTaskCollection get_active_tasks() const;
+  AsyncTaskCollection get_sleeping_tasks() const;
 
   void poll();
 
@@ -93,49 +93,15 @@ PUBLISHED:
   virtual void write(ostream &out, int indent_level = 0) const;
 
 protected:
-  class AsyncTaskManagerThread;
-  typedef pvector< PT(AsyncTask) > TaskHeap;
-
-  bool do_has_task(AsyncTask *task) const;
-  int find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const;
+  AsyncTaskChain *do_make_task_chain(const string &name);
+  AsyncTaskChain *do_find_task_chain(const string &name);
 
   INLINE void add_task_by_name(AsyncTask *task);
   void remove_task_by_name(AsyncTask *task);
 
-  void service_one_task(AsyncTaskManagerThread *thread);
-  void cleanup_task(AsyncTask *task, bool clean_exit);
-  bool finish_sort_group();
-  void do_stop_threads();
-  void do_start_threads();
-  void do_poll();
+  bool do_has_task(AsyncTask *task) const;
 
 protected:
-  class AsyncTaskManagerThread : public Thread {
-  public:
-    AsyncTaskManagerThread(const string &name, AsyncTaskManager *manager);
-    virtual void thread_main();
-
-    AsyncTaskManager *_manager;
-    AsyncTask *_servicing;
-  };
-
-  class AsyncTaskSortWakeTime {
-  public:
-    bool operator () (AsyncTask *a, AsyncTask *b) const {
-      return a->get_wake_time() > b->get_wake_time();
-    }
-  };
-  
-  class AsyncTaskSortPriority {
-  public:
-    bool operator () (AsyncTask *a, AsyncTask *b) const {
-      if (a->get_sort() != b->get_sort()) {
-        return a->get_sort() > b->get_sort();
-      }
-      return a->get_priority() < b->get_priority();
-    }
-  };
-  
   class AsyncTaskSortName {
   public:
     bool operator () (AsyncTask *a, AsyncTask *b) const {
@@ -143,35 +109,18 @@ protected:
     }
   };
 
-  typedef pvector< PT(AsyncTaskManagerThread) > Threads;
   typedef pmultiset<AsyncTask *, AsyncTaskSortName> TasksByName;
 
-  int _num_threads;
+  // Protects all the following members.  This same lock is also used
+  // to protect all of our AsyncTaskChain members.
+  Mutex _lock; 
 
-  Mutex _lock;  // Protects all the following members.
-  ConditionVarFull _cvar;  // signaled when _active, _next_active, _sleeping, _state, or _current_sort changes, or a task finishes.
-
-  enum State {
-    S_initial,  // no threads yet
-    S_started,  // threads have been started
-    S_aborting, // task returned DS_abort, shutdown requested from sub-thread.
-    S_shutdown  // waiting for thread shutdown, requested from main thread
-  };
+  typedef ov_set<PT(AsyncTaskChain), IndirectCompareNames<AsyncTaskChain> > TaskChains;
+  TaskChains _task_chains;
 
-  Threads _threads;
   int _num_tasks;
-  int _num_busy_threads;
-  TaskHeap _active;
-  TaskHeap _next_active;
-  TaskHeap _sleeping;
   TasksByName _tasks_by_name;
-  State _state;
-  int _current_sort;
   PT(ClockObject) _clock;
-  bool _tick_clock;
-  
-  static PStatCollector _task_pcollector;
-  static PStatCollector _wait_pcollector;
 
 public:
   static TypeHandle get_class_type() {
@@ -190,7 +139,8 @@ public:
 private:
   static TypeHandle _type_handle;
 
-  friend class AsyncTaskManagerThread;
+  friend class AsyncTaskChain;
+  friend class AsyncTaskChain::AsyncTaskChainThread;
   friend class AsyncTask;
 };
 

+ 2 - 0
panda/src/event/config_event.cxx

@@ -14,6 +14,7 @@
 
 #include "config_event.h"
 #include "asyncTask.h"
+#include "asyncTaskChain.h"
 #include "asyncTaskManager.h"
 #include "buttonEventList.h"
 #include "event.h"
@@ -29,6 +30,7 @@ NotifyCategoryDef(event, "");
 
 ConfigureFn(config_event) {
   AsyncTask::init_type();
+  AsyncTaskChain::init_type();
   AsyncTaskManager::init_type();
   ButtonEventList::init_type();
   PointerEventList::init_type();

+ 1 - 0
panda/src/event/event_composite1.cxx

@@ -1,4 +1,5 @@
 #include "asyncTask.cxx"
+#include "asyncTaskChain.cxx"
 #include "asyncTaskCollection.cxx"
 #include "asyncTaskManager.cxx"
 #include "buttonEvent.cxx"

+ 4 - 2
panda/src/event/test_task.cxx

@@ -49,8 +49,10 @@ static const int num_threads = 10;
 
 int
 main(int argc, char *argv[]) {
-  PT(AsyncTaskManager) task_mgr = new AsyncTaskManager("task_mgr", num_threads);
-  task_mgr->set_tick_clock(true);
+  PT(AsyncTaskManager) task_mgr = new AsyncTaskManager("task_mgr");
+  PT(AsyncTaskChain) chain = task_mgr->make_task_chain("");
+  chain->set_tick_clock(true);
+  chain->set_num_threads(num_threads);
 
   PerlinNoise2 length_noise(grid_size, grid_size);
   PerlinNoise2 delay_noise(grid_size, grid_size);